1use async_trait::async_trait;
2use base64::Engine;
3use chrono::Utc;
4use http::StatusCode;
5use serde_json::Value;
6use std::collections::HashMap;
7
8use fakecloud_aws::arn::Arn;
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11
12use crate::state::{
13 MessageAttribute, PlatformApplication, PlatformEndpoint, PublishedMessage, SharedSnsState,
14 SnsSubscription, SnsTopic,
15};
16
17pub struct SnsService {
18 state: SharedSnsState,
19 delivery: Arc<DeliveryBus>,
20}
21
22impl SnsService {
23 pub fn new(state: SharedSnsState, delivery: Arc<DeliveryBus>) -> Self {
24 Self { state, delivery }
25 }
26}
27
28use std::sync::Arc;
29
30const DEFAULT_PAGE_SIZE: usize = 100;
31
32const DEFAULT_EFFECTIVE_DELIVERY_POLICY: &str = r#"{"defaultHealthyRetryPolicy":{"numNoDelayRetries":0,"numMinDelayRetries":0,"minDelayTarget":20,"maxDelayTarget":20,"numMaxDelayRetries":0,"numRetries":3,"backoffFunction":"linear"},"sicklyRetryPolicy":null,"throttlePolicy":null,"guaranteed":false}"#;
33
34fn default_policy(topic_arn: &str, account_id: &str) -> String {
35 serde_json::json!({
36 "Version": "2008-10-17",
37 "Id": "__default_policy_ID",
38 "Statement": [{
39 "Effect": "Allow",
40 "Sid": "__default_statement_ID",
41 "Principal": {"AWS": "*"},
42 "Action": [
43 "SNS:GetTopicAttributes",
44 "SNS:SetTopicAttributes",
45 "SNS:AddPermission",
46 "SNS:RemovePermission",
47 "SNS:DeleteTopic",
48 "SNS:Subscribe",
49 "SNS:ListSubscriptionsByTopic",
50 "SNS:Publish",
51 ],
52 "Resource": topic_arn,
53 "Condition": {"StringEquals": {"AWS:SourceOwner": account_id}},
54 }]
55 })
56 .to_string()
57}
58
59const VALID_SNS_ACTIONS: &[&str] = &[
60 "GetTopicAttributes",
61 "SetTopicAttributes",
62 "AddPermission",
63 "RemovePermission",
64 "DeleteTopic",
65 "Subscribe",
66 "ListSubscriptionsByTopic",
67 "Publish",
68 "Receive",
69];
70
71const VALID_SUBSCRIPTION_ATTRS: &[&str] = &[
72 "RawMessageDelivery",
73 "DeliveryPolicy",
74 "FilterPolicy",
75 "FilterPolicyScope",
76 "RedrivePolicy",
77 "SubscriptionRoleArn",
78];
79
80#[async_trait]
81impl AwsService for SnsService {
82 fn service_name(&self) -> &str {
83 "sns"
84 }
85
86 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
87 match req.action.as_str() {
88 "CreateTopic" => self.create_topic(&req),
89 "DeleteTopic" => self.delete_topic(&req),
90 "ListTopics" => self.list_topics(&req),
91 "GetTopicAttributes" => self.get_topic_attributes(&req),
92 "SetTopicAttributes" => self.set_topic_attributes(&req),
93 "Subscribe" => self.subscribe(&req),
94 "ConfirmSubscription" => self.confirm_subscription(&req),
95 "Unsubscribe" => self.unsubscribe(&req),
96 "Publish" => self.publish(&req),
97 "PublishBatch" => self.publish_batch(&req),
98 "ListSubscriptions" => self.list_subscriptions(&req),
99 "ListSubscriptionsByTopic" => self.list_subscriptions_by_topic(&req),
100 "GetSubscriptionAttributes" => self.get_subscription_attributes(&req),
101 "SetSubscriptionAttributes" => self.set_subscription_attributes(&req),
102 "TagResource" => self.tag_resource(&req),
103 "UntagResource" => self.untag_resource(&req),
104 "ListTagsForResource" => self.list_tags_for_resource(&req),
105 "AddPermission" => self.add_permission(&req),
106 "RemovePermission" => self.remove_permission(&req),
107 "CreatePlatformApplication" => self.create_platform_application(&req),
109 "DeletePlatformApplication" => self.delete_platform_application(&req),
110 "GetPlatformApplicationAttributes" => self.get_platform_application_attributes(&req),
111 "SetPlatformApplicationAttributes" => self.set_platform_application_attributes(&req),
112 "ListPlatformApplications" => self.list_platform_applications(&req),
113 "CreatePlatformEndpoint" => self.create_platform_endpoint(&req),
114 "DeleteEndpoint" => self.delete_endpoint(&req),
115 "GetEndpointAttributes" => self.get_endpoint_attributes(&req),
116 "SetEndpointAttributes" => self.set_endpoint_attributes(&req),
117 "ListEndpointsByPlatformApplication" => {
118 self.list_endpoints_by_platform_application(&req)
119 }
120 "SetSMSAttributes" => self.set_sms_attributes(&req),
122 "GetSMSAttributes" => self.get_sms_attributes(&req),
123 "CheckIfPhoneNumberIsOptedOut" => self.check_if_phone_number_is_opted_out(&req),
124 "ListPhoneNumbersOptedOut" => self.list_phone_numbers_opted_out(&req),
125 "OptInPhoneNumber" => self.opt_in_phone_number(&req),
126 _ => Err(AwsServiceError::action_not_implemented("sns", &req.action)),
127 }
128 }
129
130 fn supported_actions(&self) -> &[&str] {
131 SNS_SUPPORTED_ACTIONS
132 }
133
134 fn iam_enforceable(&self) -> bool {
135 true
136 }
137
138 fn iam_action_for(&self, request: &AwsRequest) -> Option<fakecloud_core::auth::IamAction> {
144 let action = SNS_SUPPORTED_ACTIONS
145 .iter()
146 .copied()
147 .find(|a| *a == request.action)?;
148 let resource = match action {
149 "CreateTopic" => {
150 let state = self.state.read();
157 let partition = if request.region.starts_with("cn-") {
158 "aws-cn"
159 } else if request.region.starts_with("us-gov-") {
160 "aws-us-gov"
161 } else {
162 "aws"
163 };
164 param(request, "Name")
165 .map(|n| {
166 format!(
167 "arn:{}:sns:{}:{}:{}",
168 partition, request.region, state.account_id, n
169 )
170 })
171 .unwrap_or_else(|| "*".to_string())
172 }
173 "DeleteTopic"
174 | "GetTopicAttributes"
175 | "SetTopicAttributes"
176 | "Subscribe"
177 | "Publish"
178 | "PublishBatch"
179 | "ListSubscriptionsByTopic"
180 | "AddPermission"
181 | "RemovePermission"
182 | "ConfirmSubscription" => {
187 param(request, "TopicArn").unwrap_or_else(|| "*".to_string())
188 }
189 "Unsubscribe" | "GetSubscriptionAttributes" | "SetSubscriptionAttributes" => {
190 param(request, "SubscriptionArn").unwrap_or_else(|| "*".to_string())
191 }
192 "TagResource" | "UntagResource" | "ListTagsForResource" => {
193 param(request, "ResourceArn").unwrap_or_else(|| "*".to_string())
194 }
195 "DeletePlatformApplication"
196 | "GetPlatformApplicationAttributes"
197 | "SetPlatformApplicationAttributes"
198 | "CreatePlatformEndpoint"
199 | "ListEndpointsByPlatformApplication" => {
200 param(request, "PlatformApplicationArn").unwrap_or_else(|| "*".to_string())
201 }
202 "DeleteEndpoint" | "GetEndpointAttributes" | "SetEndpointAttributes" => {
203 param(request, "EndpointArn").unwrap_or_else(|| "*".to_string())
204 }
205 _ => "*".to_string(),
206 };
207 Some(fakecloud_core::auth::IamAction {
208 service: "sns",
209 action,
210 resource,
211 })
212 }
213}
214
215const SNS_SUPPORTED_ACTIONS: &[&str] = &[
216 "CreateTopic",
217 "DeleteTopic",
218 "ListTopics",
219 "GetTopicAttributes",
220 "SetTopicAttributes",
221 "Subscribe",
222 "ConfirmSubscription",
223 "Unsubscribe",
224 "Publish",
225 "PublishBatch",
226 "ListSubscriptions",
227 "ListSubscriptionsByTopic",
228 "GetSubscriptionAttributes",
229 "SetSubscriptionAttributes",
230 "TagResource",
231 "UntagResource",
232 "ListTagsForResource",
233 "AddPermission",
234 "RemovePermission",
235 "CreatePlatformApplication",
236 "DeletePlatformApplication",
237 "GetPlatformApplicationAttributes",
238 "SetPlatformApplicationAttributes",
239 "ListPlatformApplications",
240 "CreatePlatformEndpoint",
241 "DeleteEndpoint",
242 "GetEndpointAttributes",
243 "SetEndpointAttributes",
244 "ListEndpointsByPlatformApplication",
245 "SetSMSAttributes",
246 "GetSMSAttributes",
247 "CheckIfPhoneNumberIsOptedOut",
248 "ListPhoneNumbersOptedOut",
249 "OptInPhoneNumber",
250];
251
252fn param(req: &AwsRequest, name: &str) -> Option<String> {
254 if let Some(v) = req.query_params.get(name) {
256 return Some(v.clone());
257 }
258 if let Ok(body) = serde_json::from_slice::<Value>(&req.body) {
260 if let Some(s) = body[name].as_str() {
261 return Some(s.to_string());
262 }
263 }
264 None
265}
266
267fn required(req: &AwsRequest, name: &str) -> Result<String, AwsServiceError> {
268 param(req, name).ok_or_else(|| {
269 AwsServiceError::aws_error(
270 StatusCode::BAD_REQUEST,
271 "InvalidParameter",
272 format!("The request must contain the parameter {name}"),
273 )
274 })
275}
276
277fn validate_message_structure_json(message: &str) -> Result<(), AwsServiceError> {
278 let parsed: Value = serde_json::from_str(message).map_err(|_| {
279 AwsServiceError::aws_error(
280 StatusCode::BAD_REQUEST,
281 "InvalidParameter",
282 "Invalid parameter: Message Structure - No JSON message body is parseable",
283 )
284 })?;
285 if parsed.get("default").is_none() {
286 return Err(AwsServiceError::aws_error(
287 StatusCode::BAD_REQUEST,
288 "InvalidParameter",
289 "Invalid parameter: Message Structure - No default entry in JSON message body",
290 ));
291 }
292 Ok(())
293}
294
295fn not_found(entity: &str) -> AwsServiceError {
296 AwsServiceError::aws_error(
297 StatusCode::NOT_FOUND,
298 "NotFound",
299 format!("{entity} does not exist"),
300 )
301}
302
303fn arn_region(arn: &str) -> Option<&str> {
305 let parts: Vec<&str> = arn.split(':').collect();
306 if parts.len() >= 4 {
307 Some(parts[3])
308 } else {
309 None
310 }
311}
312
313fn xml_resp(inner: &str, _request_id: &str) -> AwsResponse {
315 let xml = format!("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n{inner}\n");
316 AwsResponse::xml(StatusCode::OK, xml)
317}
318
319const FIFO_NAME_ERROR: &str = "Fifo Topic names must end with .fifo and must be made up of only uppercase and lowercase ASCII letters, numbers, underscores, and hyphens, and must be between 1 and 256 characters long.";
320const STANDARD_NAME_ERROR: &str = "Topic names must be made up of only uppercase and lowercase ASCII letters, numbers, underscores, and hyphens, and must be between 1 and 256 characters long.";
321
322fn validate_topic_name(name: &str, is_fifo_attr: bool) -> Result<(), AwsServiceError> {
324 if name.is_empty() || name.len() > 256 {
325 return Err(AwsServiceError::aws_error(
326 StatusCode::BAD_REQUEST,
327 "InvalidParameter",
328 STANDARD_NAME_ERROR,
329 ));
330 }
331
332 let base_name = name.strip_suffix(".fifo").unwrap_or(name);
333 let valid_chars = base_name
334 .chars()
335 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_');
336
337 if !valid_chars {
338 let msg = if name.ends_with(".fifo") || is_fifo_attr {
339 FIFO_NAME_ERROR
340 } else {
341 STANDARD_NAME_ERROR
342 };
343 return Err(AwsServiceError::aws_error(
344 StatusCode::BAD_REQUEST,
345 "InvalidParameter",
346 msg,
347 ));
348 }
349
350 if is_fifo_attr && !name.ends_with(".fifo") {
352 return Err(AwsServiceError::aws_error(
353 StatusCode::BAD_REQUEST,
354 "InvalidParameter",
355 FIFO_NAME_ERROR,
356 ));
357 }
358
359 if name.ends_with(".fifo") && !is_fifo_attr {
360 return Err(AwsServiceError::aws_error(
361 StatusCode::BAD_REQUEST,
362 "InvalidParameter",
363 STANDARD_NAME_ERROR,
364 ));
365 }
366
367 Ok(())
368}
369
370impl SnsService {
371 fn create_topic(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
372 let name = required(req, "Name")?;
373
374 let topic_attrs = parse_entries(req, "Attributes");
376 let is_fifo_attr = topic_attrs
377 .get("FifoTopic")
378 .map(|v| v.eq_ignore_ascii_case("true"))
379 .unwrap_or(false);
380 let is_fifo = name.ends_with(".fifo");
381
382 validate_topic_name(&name, is_fifo_attr)?;
383
384 let tags = parse_tags(req);
386
387 let mut state = self.state.write();
388 let topic_arn = Arn::new("sns", &req.region, &state.account_id, &name).to_string();
389
390 if !state.topics.contains_key(&topic_arn) {
391 let mut attributes = HashMap::new();
392 attributes.insert(
394 "Policy".to_string(),
395 default_policy(&topic_arn, &state.account_id),
396 );
397 attributes.insert("DisplayName".to_string(), String::new());
398 attributes.insert("DeliveryPolicy".to_string(), String::new());
399
400 if is_fifo {
401 attributes.insert("FifoTopic".to_string(), "true".to_string());
402 attributes.insert("ContentBasedDeduplication".to_string(), "false".to_string());
403 }
404
405 for (k, v) in &topic_attrs {
407 if k == "FifoTopic" || k == "ContentBasedDeduplication" {
409 let normalized = if v.eq_ignore_ascii_case("true") {
410 "true"
411 } else {
412 "false"
413 };
414 if k == "FifoTopic" && normalized == "false" {
415 attributes.remove("FifoTopic");
416 attributes.remove("ContentBasedDeduplication");
417 continue;
418 }
419 attributes.insert(k.clone(), normalized.to_string());
420 continue;
421 }
422 attributes.insert(k.clone(), v.clone());
423 }
424
425 let topic = SnsTopic {
426 topic_arn: topic_arn.clone(),
427 name,
428 attributes,
429 tags,
430 is_fifo,
431 created_at: Utc::now(),
432 };
433 state.topics.insert(topic_arn.clone(), topic);
434 }
435
436 Ok(xml_resp(
437 &format!(
438 r#"<CreateTopicResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
439 <CreateTopicResult>
440 <TopicArn>{topic_arn}</TopicArn>
441 </CreateTopicResult>
442 <ResponseMetadata>
443 <RequestId>{}</RequestId>
444 </ResponseMetadata>
445</CreateTopicResponse>"#,
446 req.request_id
447 ),
448 &req.request_id,
449 ))
450 }
451
452 fn delete_topic(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
453 let topic_arn = required(req, "TopicArn")?;
454 let mut state = self.state.write();
455 state.topics.remove(&topic_arn);
456 state
457 .subscriptions
458 .retain(|_, sub| sub.topic_arn != topic_arn);
459
460 Ok(xml_resp(
461 &format!(
462 r#"<DeleteTopicResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
463 <ResponseMetadata>
464 <RequestId>{}</RequestId>
465 </ResponseMetadata>
466</DeleteTopicResponse>"#,
467 req.request_id
468 ),
469 &req.request_id,
470 ))
471 }
472
473 fn list_topics(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
474 let state = self.state.read();
475
476 let all_topics: Vec<&SnsTopic> = state
478 .topics
479 .values()
480 .filter(|t| {
481 let parts: Vec<&str> = t.topic_arn.split(':').collect();
483 parts.len() >= 4 && parts[3] == req.region
484 })
485 .collect();
486
487 let next_token = param(req, "NextToken")
488 .and_then(|t| t.parse::<usize>().ok())
489 .unwrap_or(0);
490 let next_token = next_token.min(all_topics.len());
491
492 let page = &all_topics[next_token..];
493 let has_more = page.len() > DEFAULT_PAGE_SIZE;
494 let page = if has_more {
495 &page[..DEFAULT_PAGE_SIZE]
496 } else {
497 page
498 };
499
500 let members: String = page
501 .iter()
502 .map(|t| {
503 format!(
504 " <member><TopicArn>{}</TopicArn></member>",
505 t.topic_arn
506 )
507 })
508 .collect::<Vec<_>>()
509 .join("\n");
510
511 let next_token_xml = if has_more {
512 format!(
513 "\n <NextToken>{}</NextToken>",
514 next_token + DEFAULT_PAGE_SIZE
515 )
516 } else {
517 String::new()
518 };
519
520 Ok(xml_resp(
521 &format!(
522 r#"<ListTopicsResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
523 <ListTopicsResult>
524 <Topics>
525{members}
526 </Topics>{next_token_xml}
527 </ListTopicsResult>
528 <ResponseMetadata>
529 <RequestId>{}</RequestId>
530 </ResponseMetadata>
531</ListTopicsResponse>"#,
532 req.request_id
533 ),
534 &req.request_id,
535 ))
536 }
537
538 fn get_topic_attributes(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
539 let topic_arn = required(req, "TopicArn")?;
540
541 if let Some(topic_region) = arn_region(&topic_arn) {
543 if topic_region != req.region {
544 return Err(not_found("Topic"));
545 }
546 }
547
548 let state = self.state.read();
549 let topic = state
550 .topics
551 .get(&topic_arn)
552 .ok_or_else(|| not_found("Topic"))?;
553
554 let subs_confirmed = state
555 .subscriptions
556 .values()
557 .filter(|s| s.topic_arn == topic_arn && s.confirmed)
558 .count();
559 let subs_pending = state
560 .subscriptions
561 .values()
562 .filter(|s| s.topic_arn == topic_arn && !s.confirmed)
563 .count();
564
565 let mut entries = vec![
566 format_attr("TopicArn", &topic.topic_arn),
567 format_attr("Owner", &state.account_id),
568 format_attr("SubscriptionsConfirmed", &subs_confirmed.to_string()),
569 format_attr("SubscriptionsPending", &subs_pending.to_string()),
570 format_attr("SubscriptionsDeleted", "0"),
571 ];
572
573 entries.push(format_attr(
575 "EffectiveDeliveryPolicy",
576 DEFAULT_EFFECTIVE_DELIVERY_POLICY,
577 ));
578
579 for (k, v) in &topic.attributes {
581 entries.push(format_attr(k, v));
582 }
583
584 let attrs = entries.join("\n");
585 Ok(xml_resp(
586 &format!(
587 r#"<GetTopicAttributesResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
588 <GetTopicAttributesResult>
589 <Attributes>
590{attrs}
591 </Attributes>
592 </GetTopicAttributesResult>
593 <ResponseMetadata>
594 <RequestId>{}</RequestId>
595 </ResponseMetadata>
596</GetTopicAttributesResponse>"#,
597 req.request_id
598 ),
599 &req.request_id,
600 ))
601 }
602
603 fn set_topic_attributes(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
604 let topic_arn = required(req, "TopicArn")?;
605 let attr_name = required(req, "AttributeName")?;
606 let attr_value = param(req, "AttributeValue").unwrap_or_default();
607
608 let mut state = self.state.write();
609 let topic = state
610 .topics
611 .get_mut(&topic_arn)
612 .ok_or_else(|| not_found("Topic"))?;
613
614 if attr_name == "Policy" {
616 if let Ok(parsed) = serde_json::from_str::<Value>(&attr_value) {
617 if let Ok(compact) = serde_json::to_string(&parsed) {
618 topic.attributes.insert(attr_name, compact);
619 } else {
620 topic.attributes.insert(attr_name, attr_value);
621 }
622 } else {
623 topic.attributes.insert(attr_name, attr_value);
624 }
625 } else {
626 topic.attributes.insert(attr_name, attr_value);
627 }
628
629 Ok(xml_resp(
630 &format!(
631 r#"<SetTopicAttributesResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
632 <ResponseMetadata>
633 <RequestId>{}</RequestId>
634 </ResponseMetadata>
635</SetTopicAttributesResponse>"#,
636 req.request_id
637 ),
638 &req.request_id,
639 ))
640 }
641
642 fn subscribe(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
643 let topic_arn = required(req, "TopicArn")?;
644 let protocol = required(req, "Protocol")?;
645 let endpoint = param(req, "Endpoint").unwrap_or_default();
646
647 let state_r = self.state.read();
648 let topic = state_r
649 .topics
650 .get(&topic_arn)
651 .ok_or_else(|| not_found("Topic"))?;
652 let is_fifo_topic = topic.is_fifo;
653 let account_id = state_r.account_id.clone();
654
655 if protocol == "application" {
657 let endpoint_exists = state_r
658 .platform_applications
659 .values()
660 .any(|app| app.endpoints.contains_key(&endpoint));
661 if !endpoint_exists {
662 return Err(AwsServiceError::aws_error(
663 StatusCode::BAD_REQUEST,
664 "InvalidParameter",
665 format!(
666 "Invalid parameter: Endpoint Reason: Endpoint does not exist for endpoint {endpoint}"
667 ),
668 ));
669 }
670 }
671 drop(state_r);
672
673 if protocol == "sms" {
675 validate_sms_endpoint(&endpoint)?;
676 }
677
678 if protocol == "sqs" && !endpoint.starts_with("arn:aws:sqs:") {
680 return Err(AwsServiceError::aws_error(
681 StatusCode::BAD_REQUEST,
682 "InvalidParameter",
683 "Invalid parameter: SQS endpoint ARN",
684 ));
685 }
686
687 if protocol == "sqs" && endpoint.ends_with(".fifo") && !is_fifo_topic {
689 return Err(AwsServiceError::aws_error(
690 StatusCode::BAD_REQUEST,
691 "InvalidParameter",
692 "Invalid parameter: Invalid parameter: Endpoint Reason: FIFO SQS Queues can not be subscribed to standard SNS topics",
693 ));
694 }
695
696 let sub_attrs = parse_entries(req, "Attributes");
698
699 for key in sub_attrs.keys() {
701 if !VALID_SUBSCRIPTION_ATTRS.contains(&key.as_str()) {
702 return Err(AwsServiceError::aws_error(
703 StatusCode::BAD_REQUEST,
704 "InvalidParameter",
705 format!("Invalid parameter: Attributes Reason: Unknown attribute: {key}"),
706 ));
707 }
708 }
709
710 let mut attributes = sub_attrs;
712 if let Some(fp) = attributes.get("FilterPolicy") {
713 if !fp.is_empty() {
714 validate_filter_policy(fp)?;
715 }
716 if !attributes.contains_key("FilterPolicyScope") {
717 attributes.insert(
718 "FilterPolicyScope".to_string(),
719 "MessageAttributes".to_string(),
720 );
721 }
722 }
723
724 let mut state = self.state.write();
726 for sub in state.subscriptions.values() {
727 if sub.topic_arn == topic_arn && sub.protocol == protocol && sub.endpoint == endpoint {
728 return Ok(xml_resp(
729 &format!(
730 r#"<SubscribeResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
731 <SubscribeResult>
732 <SubscriptionArn>{}</SubscriptionArn>
733 </SubscribeResult>
734 <ResponseMetadata>
735 <RequestId>{}</RequestId>
736 </ResponseMetadata>
737</SubscribeResponse>"#,
738 sub.subscription_arn, req.request_id
739 ),
740 &req.request_id,
741 ));
742 }
743 }
744
745 let sub_arn = format!("{}:{}", topic_arn, uuid::Uuid::new_v4());
746
747 let confirmed = protocol != "http" && protocol != "https";
749 let response_arn = if confirmed {
750 sub_arn.clone()
751 } else {
752 "pending confirmation".to_string()
753 };
754
755 let confirmation_token = if !confirmed {
757 Some(uuid::Uuid::new_v4().to_string())
758 } else {
759 None
760 };
761
762 let sub = SnsSubscription {
763 subscription_arn: sub_arn.clone(),
764 topic_arn,
765 protocol,
766 endpoint,
767 owner: account_id,
768 attributes,
769 confirmed,
770 confirmation_token,
771 };
772
773 state.subscriptions.insert(sub_arn.clone(), sub);
774
775 Ok(xml_resp(
776 &format!(
777 r#"<SubscribeResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
778 <SubscribeResult>
779 <SubscriptionArn>{response_arn}</SubscriptionArn>
780 </SubscribeResult>
781 <ResponseMetadata>
782 <RequestId>{}</RequestId>
783 </ResponseMetadata>
784</SubscribeResponse>"#,
785 req.request_id
786 ),
787 &req.request_id,
788 ))
789 }
790
791 fn confirm_subscription(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
792 let topic_arn = required(req, "TopicArn")?;
793 let token = required(req, "Token")?;
794
795 let mut state = self.state.write();
796 let sub_arn = state
799 .subscriptions
800 .values()
801 .find(|s| {
802 s.topic_arn == topic_arn
803 && (s.confirmation_token.as_deref() == Some(&token)
804 || s.subscription_arn == token)
805 })
806 .map(|s| s.subscription_arn.clone())
807 .ok_or_else(|| {
808 AwsServiceError::aws_error(
809 StatusCode::NOT_FOUND,
810 "NotFound",
811 format!("No pending subscription found for token: {token}"),
812 )
813 })?;
814
815 if let Some(sub) = state.subscriptions.get_mut(&sub_arn) {
817 sub.confirmed = true;
818 }
819
820 Ok(xml_resp(
821 &format!(
822 r#"<ConfirmSubscriptionResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
823 <ConfirmSubscriptionResult>
824 <SubscriptionArn>{sub_arn}</SubscriptionArn>
825 </ConfirmSubscriptionResult>
826 <ResponseMetadata>
827 <RequestId>{}</RequestId>
828 </ResponseMetadata>
829</ConfirmSubscriptionResponse>"#,
830 req.request_id
831 ),
832 &req.request_id,
833 ))
834 }
835
836 fn unsubscribe(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
837 let sub_arn = required(req, "SubscriptionArn")?;
838 self.state.write().subscriptions.remove(&sub_arn);
839
840 Ok(xml_resp(
841 &format!(
842 r#"<UnsubscribeResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
843 <ResponseMetadata>
844 <RequestId>{}</RequestId>
845 </ResponseMetadata>
846</UnsubscribeResponse>"#,
847 req.request_id
848 ),
849 &req.request_id,
850 ))
851 }
852
853 fn publish(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
854 let topic_arn = param(req, "TopicArn").or_else(|| param(req, "TargetArn"));
856 let phone_number = param(req, "PhoneNumber");
857
858 if topic_arn.is_none() && phone_number.is_none() {
859 return Err(AwsServiceError::aws_error(
860 StatusCode::BAD_REQUEST,
861 "InvalidParameter",
862 "The request must contain the parameter TopicArn or TargetArn or PhoneNumber",
863 ));
864 }
865
866 let message = required(req, "Message")?;
867 let subject = param(req, "Subject");
868 let message_group_id = param(req, "MessageGroupId");
869 let message_dedup_id = param(req, "MessageDeduplicationId");
870 let message_structure = param(req, "MessageStructure");
871
872 if let Some(ref subj) = subject {
874 if subj.len() > 100 {
875 return Err(AwsServiceError::aws_error(
876 StatusCode::BAD_REQUEST,
877 "InvalidParameter",
878 "Subject must be less than 100 characters",
879 ));
880 }
881 }
882
883 if message.len() > 262144 {
885 return Err(AwsServiceError::aws_error(
886 StatusCode::BAD_REQUEST,
887 "InvalidParameter",
888 "Invalid parameter: Message too long",
889 ));
890 }
891
892 if message_structure.as_deref() == Some("json") {
894 validate_message_structure_json(&message)?;
895 }
896
897 let message_attributes = parse_message_attributes(req);
899
900 if let Some(ref phone) = phone_number {
901 return self.publish_to_phone_number(
902 req,
903 phone,
904 message,
905 subject,
906 message_attributes,
907 message_group_id,
908 message_dedup_id,
909 );
910 }
911
912 let topic_arn = topic_arn.ok_or_else(|| {
913 AwsServiceError::aws_error(
914 StatusCode::BAD_REQUEST,
915 "InvalidParameter",
916 "TopicArn or TargetArn is required",
917 )
918 })?;
919
920 if topic_arn.contains(":endpoint/") {
922 return self.publish_to_platform_endpoint(
923 &topic_arn,
924 &message,
925 &message_attributes,
926 &req.request_id,
927 );
928 }
929
930 let mut state = self.state.write();
931 let topic = state
932 .topics
933 .get(&topic_arn)
934 .ok_or_else(|| not_found("Topic"))?;
935
936 if topic.is_fifo {
938 if message_group_id.is_none() {
939 return Err(AwsServiceError::aws_error(
940 StatusCode::BAD_REQUEST,
941 "InvalidParameter",
942 "Invalid parameter: The request must contain the parameter MessageGroupId.",
943 ));
944 }
945 let content_dedup = topic
947 .attributes
948 .get("ContentBasedDeduplication")
949 .map(|v| v == "true")
950 .unwrap_or(false);
951 if !content_dedup && message_dedup_id.is_none() {
952 return Err(AwsServiceError::aws_error(
953 StatusCode::BAD_REQUEST,
954 "InvalidParameter",
955 "Invalid parameter: The topic should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly",
956 ));
957 }
958 } else {
959 if message_dedup_id.is_some() {
962 return Err(AwsServiceError::aws_error(
963 StatusCode::BAD_REQUEST,
964 "InvalidParameter",
965 "Invalid parameter: The request includes MessageDeduplicationId parameter that is not valid for this topic type",
966 ));
967 }
968 }
969
970 let msg_id = uuid::Uuid::new_v4().to_string();
971 state.published.push(PublishedMessage {
972 message_id: msg_id.clone(),
973 topic_arn: topic_arn.clone(),
974 message: message.clone(),
975 subject: subject.clone(),
976 message_attributes: message_attributes.clone(),
977 message_group_id: message_group_id.clone(),
978 message_dedup_id: message_dedup_id.clone(),
979 timestamp: Utc::now(),
980 });
981
982 let parsed_structure: Option<Value> = if message_structure.as_deref() == Some("json") {
984 Some(serde_json::from_str(&message).map_err(|_| {
985 AwsServiceError::aws_error(
986 StatusCode::BAD_REQUEST,
987 "InvalidParameter",
988 "Invalid parameter: Message Structure - No JSON message body is parseable",
989 )
990 })?)
991 } else {
992 None
993 };
994
995 let subscribers =
996 collect_topic_subscribers(&state, &topic_arn, &message_attributes, &message);
997 let endpoint = state.endpoint.clone();
998 drop(state);
999
1000 let sqs_message = if let Some(ref structure) = parsed_structure {
1002 structure
1003 .get("sqs")
1004 .or_else(|| structure.get("default"))
1005 .and_then(|v| v.as_str())
1006 .unwrap_or(&message)
1007 .to_string()
1008 } else {
1009 message.clone()
1010 };
1011
1012 let default_message = if let Some(ref structure) = parsed_structure {
1013 structure
1014 .get("default")
1015 .and_then(|v| v.as_str())
1016 .unwrap_or(&message)
1017 .to_string()
1018 } else {
1019 message.clone()
1020 };
1021
1022 let envelope_attrs = build_envelope_attrs(&message_attributes);
1023
1024 let ctx = TopicFanoutContext {
1025 msg_id: &msg_id,
1026 topic_arn: &topic_arn,
1027 subject: subject.as_deref(),
1028 endpoint: &endpoint,
1029 sqs_message: &sqs_message,
1030 default_message: &default_message,
1031 envelope_attrs: &envelope_attrs,
1032 message_attributes: &message_attributes,
1033 message_group_id: message_group_id.as_deref(),
1034 message_dedup_id: message_dedup_id.as_deref(),
1035 };
1036
1037 self.deliver_to_sqs_subscribers(&subscribers.sqs, &ctx);
1038 self.deliver_to_http_subscribers(&subscribers.http, &ctx);
1039 self.deliver_to_lambda_subscribers(&subscribers.lambda, &ctx);
1040 self.deliver_to_email_subscribers(&subscribers.email, &ctx);
1041 self.deliver_to_sms_subscribers(&subscribers.sms, &ctx);
1042
1043 Ok(xml_resp(
1044 &format!(
1045 r#"<PublishResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
1046 <PublishResult>
1047 <MessageId>{msg_id}</MessageId>
1048 </PublishResult>
1049 <ResponseMetadata>
1050 <RequestId>{}</RequestId>
1051 </ResponseMetadata>
1052</PublishResponse>"#,
1053 req.request_id
1054 ),
1055 &req.request_id,
1056 ))
1057 }
1058
1059 fn deliver_to_sqs_subscribers(&self, subs: &[(String, bool)], ctx: &TopicFanoutContext<'_>) {
1060 for (queue_arn, raw) in subs {
1061 if *raw {
1062 let mut sqs_msg_attrs = HashMap::new();
1063 for (k, v) in ctx.message_attributes {
1064 let mut attr = fakecloud_core::delivery::SqsMessageAttribute {
1065 data_type: v.data_type.clone(),
1066 string_value: v.string_value.clone(),
1067 binary_value: None,
1068 };
1069 if let Some(ref bv) = v.binary_value {
1070 attr.binary_value =
1071 Some(base64::engine::general_purpose::STANDARD.encode(bv));
1072 }
1073 sqs_msg_attrs.insert(k.clone(), attr);
1074 }
1075 self.delivery.send_to_sqs_with_attrs(
1076 queue_arn,
1077 ctx.sqs_message,
1078 &sqs_msg_attrs,
1079 ctx.message_group_id,
1080 ctx.message_dedup_id,
1081 );
1082 } else {
1083 let envelope_str = build_sns_envelope(
1084 ctx.msg_id,
1085 ctx.topic_arn,
1086 &ctx.subject.map(|s| s.to_string()),
1087 ctx.sqs_message,
1088 ctx.envelope_attrs,
1089 ctx.endpoint,
1090 );
1091 self.delivery
1092 .send_to_sqs(queue_arn, &envelope_str, &HashMap::new());
1093 }
1094 }
1095 }
1096
1097 fn deliver_to_http_subscribers(&self, subs: &[String], ctx: &TopicFanoutContext<'_>) {
1098 for endpoint_url in subs {
1099 let body = build_sns_envelope(
1100 ctx.msg_id,
1101 ctx.topic_arn,
1102 &ctx.subject.map(|s| s.to_string()),
1103 ctx.default_message,
1104 ctx.envelope_attrs,
1105 ctx.endpoint,
1106 );
1107 let endpoint_url = endpoint_url.clone();
1108 let topic = ctx.topic_arn.to_string();
1109 tokio::spawn(async move {
1110 let client = reqwest::Client::new();
1111 let result = client
1112 .post(&endpoint_url)
1113 .header("Content-Type", "application/json")
1114 .header("x-amz-sns-message-type", "Notification")
1115 .header("x-amz-sns-topic-arn", &topic)
1116 .body(body)
1117 .send()
1118 .await;
1119 if let Err(e) = result {
1120 tracing::warn!(endpoint = %endpoint_url, error = %e, "SNS HTTP delivery failed");
1121 }
1122 });
1123 }
1124 }
1125
1126 fn deliver_to_lambda_subscribers(
1127 &self,
1128 subs: &[(String, String)],
1129 ctx: &TopicFanoutContext<'_>,
1130 ) {
1131 if subs.is_empty() {
1132 return;
1133 }
1134 let now = Utc::now();
1135 let subject_owned = ctx.subject.map(|s| s.to_string());
1136
1137 let lambda_payloads: Vec<(String, String)> = subs
1138 .iter()
1139 .map(|(function_arn, subscription_arn)| {
1140 let payload = build_sns_lambda_event(&SnsLambdaEventInput {
1141 message_id: ctx.msg_id,
1142 topic_arn: ctx.topic_arn,
1143 subscription_arn,
1144 message: ctx.default_message,
1145 subject: ctx.subject,
1146 message_attributes: ctx.envelope_attrs,
1147 timestamp: &now,
1148 endpoint: ctx.endpoint,
1149 });
1150 (function_arn.clone(), payload)
1151 })
1152 .collect();
1153
1154 {
1155 let mut state = self.state.write();
1156 for (function_arn, _) in &lambda_payloads {
1157 state
1158 .lambda_invocations
1159 .push(crate::state::LambdaInvocation {
1160 function_arn: function_arn.clone(),
1161 message: ctx.default_message.to_string(),
1162 subject: subject_owned.clone(),
1163 timestamp: now,
1164 });
1165 }
1166 }
1167
1168 let delivery = self.delivery.clone();
1169 tokio::spawn(async move {
1170 for (function_arn, payload) in lambda_payloads {
1171 tracing::info!(function_arn = %function_arn, "SNS invoking Lambda function");
1172 match delivery.invoke_lambda(&function_arn, &payload).await {
1173 Some(Ok(_)) => {
1174 tracing::info!(
1175 function_arn = %function_arn,
1176 "SNS->Lambda invocation succeeded"
1177 );
1178 }
1179 Some(Err(e)) => {
1180 tracing::error!(
1181 function_arn = %function_arn,
1182 error = %e,
1183 "SNS->Lambda invocation failed"
1184 );
1185 }
1186 None => {
1187 tracing::debug!(
1188 function_arn = %function_arn,
1189 "SNS->Lambda: no container runtime, skipping real execution"
1190 );
1191 }
1192 }
1193 }
1194 });
1195 }
1196
1197 fn deliver_to_email_subscribers(&self, subs: &[String], ctx: &TopicFanoutContext<'_>) {
1198 if subs.is_empty() {
1199 return;
1200 }
1201 let now = Utc::now();
1202 let subject_owned = ctx.subject.map(|s| s.to_string());
1203 let mut state = self.state.write();
1204 for email_address in subs {
1205 tracing::info!(
1206 email = %email_address,
1207 topic_arn = %ctx.topic_arn,
1208 "SNS delivering to email (stub)"
1209 );
1210 state.sent_emails.push(crate::state::SentEmail {
1211 email_address: email_address.clone(),
1212 message: ctx.default_message.to_string(),
1213 subject: subject_owned.clone(),
1214 topic_arn: ctx.topic_arn.to_string(),
1215 timestamp: now,
1216 });
1217 }
1218 }
1219
1220 fn deliver_to_sms_subscribers(&self, subs: &[String], ctx: &TopicFanoutContext<'_>) {
1221 if subs.is_empty() {
1222 return;
1223 }
1224 let mut state = self.state.write();
1225 for phone_number in subs {
1226 tracing::info!(
1227 phone_number = %phone_number,
1228 topic_arn = %ctx.topic_arn,
1229 "SNS delivering to SMS (stub)"
1230 );
1231 state
1232 .sms_messages
1233 .push((phone_number.clone(), ctx.default_message.to_string()));
1234 }
1235 }
1236
1237 fn publish_batch(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1238 let topic_arn = required(req, "TopicArn")?;
1239
1240 let state = self.state.read();
1241 let topic = state
1242 .topics
1243 .get(&topic_arn)
1244 .ok_or_else(|| not_found("Topic"))?;
1245 let is_fifo = topic.is_fifo;
1246 let endpoint = state.endpoint.clone();
1247 drop(state);
1248
1249 let mut entries = Vec::new();
1251 for n in 1..=100 {
1252 let id_key = format!("PublishBatchRequestEntries.member.{n}.Id");
1253 if let Some(id) = req.query_params.get(&id_key) {
1254 let msg_key = format!("PublishBatchRequestEntries.member.{n}.Message");
1255 let message = req.query_params.get(&msg_key).cloned().unwrap_or_default();
1256 let subject_key = format!("PublishBatchRequestEntries.member.{n}.Subject");
1257 let subject = req.query_params.get(&subject_key).cloned();
1258 let group_key = format!("PublishBatchRequestEntries.member.{n}.MessageGroupId");
1259 let group_id = req.query_params.get(&group_key).cloned();
1260 let dedup_key =
1261 format!("PublishBatchRequestEntries.member.{n}.MessageDeduplicationId");
1262 let dedup_id = req.query_params.get(&dedup_key).cloned();
1263 let structure_key =
1264 format!("PublishBatchRequestEntries.member.{n}.MessageStructure");
1265 let message_structure = req.query_params.get(&structure_key).cloned();
1266 entries.push((
1267 id.clone(),
1268 message,
1269 subject,
1270 group_id,
1271 dedup_id,
1272 message_structure,
1273 ));
1274 } else {
1275 break;
1276 }
1277 }
1278
1279 if entries.len() > 10 {
1281 return Err(AwsServiceError::aws_error(
1282 StatusCode::BAD_REQUEST,
1283 "TooManyEntriesInBatchRequest",
1284 "The batch request contains more entries than permissible.",
1285 ));
1286 }
1287
1288 let ids: Vec<&str> = entries.iter().map(|e| e.0.as_str()).collect();
1290 let unique_ids: std::collections::HashSet<&str> = ids.iter().copied().collect();
1291 if unique_ids.len() != ids.len() {
1292 return Err(AwsServiceError::aws_error(
1293 StatusCode::BAD_REQUEST,
1294 "BatchEntryIdsNotDistinct",
1295 "Two or more batch entries in the request have the same Id.",
1296 ));
1297 }
1298
1299 if is_fifo && entries.iter().any(|e| e.3.is_none()) {
1301 return Err(AwsServiceError::aws_error(
1302 StatusCode::BAD_REQUEST,
1303 "InvalidParameter",
1304 "Invalid parameter: The MessageGroupId parameter is required for FIFO topics",
1305 ));
1306 }
1307
1308 let mut successful = Vec::new();
1309 let failed: Vec<String> = Vec::new();
1310
1311 for (idx, (id, message, subject, group_id, dedup_id, structure)) in
1312 entries.iter().enumerate()
1313 {
1314 let batch_attrs = parse_batch_message_attributes(req, idx + 1);
1316
1317 if structure.as_deref() == Some("json") {
1319 validate_message_structure_json(message)?;
1320 }
1321
1322 let msg_id = uuid::Uuid::new_v4().to_string();
1323 let mut state = self.state.write();
1324 state.published.push(PublishedMessage {
1325 message_id: msg_id.clone(),
1326 topic_arn: topic_arn.clone(),
1327 message: message.clone(),
1328 subject: subject.clone(),
1329 message_attributes: batch_attrs.clone(),
1330 message_group_id: group_id.clone(),
1331 message_dedup_id: dedup_id.clone(),
1332 timestamp: Utc::now(),
1333 });
1334
1335 let parsed_structure: Option<Value> = if structure.as_deref() == Some("json") {
1337 Some(serde_json::from_str(message).map_err(|_| {
1338 AwsServiceError::aws_error(
1339 StatusCode::BAD_REQUEST,
1340 "InvalidParameter",
1341 "Invalid parameter: Message Structure - No JSON message body is parseable",
1342 )
1343 })?)
1344 } else {
1345 None
1346 };
1347 let sqs_message = if let Some(ref s) = parsed_structure {
1348 s.get("sqs")
1349 .or_else(|| s.get("default"))
1350 .and_then(|v| v.as_str())
1351 .unwrap_or(message)
1352 .to_string()
1353 } else {
1354 message.clone()
1355 };
1356
1357 let sqs_subscribers: Vec<(String, bool)> = state
1359 .subscriptions
1360 .values()
1361 .filter(|s| s.topic_arn == topic_arn && s.protocol == "sqs" && s.confirmed)
1362 .map(|s| {
1363 let raw = s
1364 .attributes
1365 .get("RawMessageDelivery")
1366 .map(|v| v == "true")
1367 .unwrap_or(false);
1368 (s.endpoint.clone(), raw)
1369 })
1370 .collect();
1371 drop(state);
1372
1373 let mut envelope_attrs = serde_json::Map::new();
1375 for (key, attr) in &batch_attrs {
1376 let mut attr_obj = serde_json::Map::new();
1377 attr_obj.insert("Type".to_string(), Value::String(attr.data_type.clone()));
1378 if let Some(ref sv) = attr.string_value {
1379 attr_obj.insert("Value".to_string(), Value::String(sv.clone()));
1380 }
1381 if let Some(ref bv) = attr.binary_value {
1382 attr_obj.insert(
1383 "Value".to_string(),
1384 Value::String(base64::engine::general_purpose::STANDARD.encode(bv)),
1385 );
1386 }
1387 envelope_attrs.insert(key.clone(), Value::Object(attr_obj));
1388 }
1389
1390 for (queue_arn, raw) in &sqs_subscribers {
1391 if *raw {
1392 let mut sqs_msg_attrs = HashMap::new();
1393 for (k, v) in &batch_attrs {
1394 let mut attr = fakecloud_core::delivery::SqsMessageAttribute {
1395 data_type: v.data_type.clone(),
1396 string_value: v.string_value.clone(),
1397 binary_value: None,
1398 };
1399 if let Some(ref bv) = v.binary_value {
1400 attr.binary_value =
1401 Some(base64::engine::general_purpose::STANDARD.encode(bv));
1402 }
1403 sqs_msg_attrs.insert(k.clone(), attr);
1404 }
1405 self.delivery.send_to_sqs_with_attrs(
1406 queue_arn,
1407 &sqs_message,
1408 &sqs_msg_attrs,
1409 if is_fifo { group_id.as_deref() } else { None },
1410 if is_fifo { dedup_id.as_deref() } else { None },
1411 );
1412 } else {
1413 let envelope_str = build_sns_envelope(
1414 &msg_id,
1415 &topic_arn,
1416 subject,
1417 &sqs_message,
1418 &envelope_attrs,
1419 &endpoint,
1420 );
1421 self.delivery
1422 .send_to_sqs(queue_arn, &envelope_str, &HashMap::new());
1423 }
1424 }
1425
1426 successful.push(format!(
1427 r#" <member>
1428 <Id>{id}</Id>
1429 <MessageId>{msg_id}</MessageId>
1430 </member>"#
1431 ));
1432 }
1433
1434 let successful_xml = successful.join("\n");
1435 let failed_xml = failed.join("\n");
1436
1437 Ok(xml_resp(
1438 &format!(
1439 r#"<PublishBatchResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
1440 <PublishBatchResult>
1441 <Successful>
1442{successful_xml}
1443 </Successful>
1444 <Failed>
1445{failed_xml}
1446 </Failed>
1447 </PublishBatchResult>
1448 <ResponseMetadata>
1449 <RequestId>{}</RequestId>
1450 </ResponseMetadata>
1451</PublishBatchResponse>"#,
1452 req.request_id
1453 ),
1454 &req.request_id,
1455 ))
1456 }
1457
1458 #[allow(clippy::too_many_arguments)]
1463 fn publish_to_phone_number(
1464 &self,
1465 req: &AwsRequest,
1466 phone: &str,
1467 message: String,
1468 subject: Option<String>,
1469 message_attributes: HashMap<String, MessageAttribute>,
1470 message_group_id: Option<String>,
1471 message_dedup_id: Option<String>,
1472 ) -> Result<AwsResponse, AwsServiceError> {
1473 let is_valid_e164 = phone.starts_with('+')
1474 && phone.len() >= 2
1475 && phone[1..].chars().all(|c| c.is_ascii_digit());
1476 if !is_valid_e164 {
1477 return Err(AwsServiceError::aws_error(
1478 StatusCode::BAD_REQUEST,
1479 "InvalidParameter",
1480 format!(
1481 "Invalid parameter: PhoneNumber Reason: {phone} does not meet the E164 format"
1482 ),
1483 ));
1484 }
1485
1486 if message.len() > 1600 {
1487 return Err(AwsServiceError::aws_error(
1488 StatusCode::BAD_REQUEST,
1489 "InvalidParameter",
1490 "Invalid parameter: Message Reason: Message must be less than 1600 characters long",
1491 ));
1492 }
1493
1494 let msg_id = uuid::Uuid::new_v4().to_string();
1495 let mut state = self.state.write();
1496 state
1497 .sms_messages
1498 .push((phone.to_string(), message.clone()));
1499 state.published.push(PublishedMessage {
1500 message_id: msg_id.clone(),
1501 topic_arn: String::new(),
1502 message,
1503 subject,
1504 message_attributes,
1505 message_group_id,
1506 message_dedup_id,
1507 timestamp: Utc::now(),
1508 });
1509
1510 Ok(xml_resp(
1511 &format!(
1512 r#"<PublishResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
1513 <PublishResult>
1514 <MessageId>{msg_id}</MessageId>
1515 </PublishResult>
1516 <ResponseMetadata>
1517 <RequestId>{}</RequestId>
1518 </ResponseMetadata>
1519</PublishResponse>"#,
1520 req.request_id
1521 ),
1522 &req.request_id,
1523 ))
1524 }
1525
1526 fn publish_to_platform_endpoint(
1527 &self,
1528 endpoint_arn: &str,
1529 message: &str,
1530 message_attributes: &HashMap<String, MessageAttribute>,
1531 request_id: &str,
1532 ) -> Result<AwsResponse, AwsServiceError> {
1533 let state = self.state.read();
1534
1535 let mut found_endpoint: Option<&PlatformEndpoint> = None;
1537 for app in state.platform_applications.values() {
1538 if let Some(ep) = app.endpoints.get(endpoint_arn) {
1539 found_endpoint = Some(ep);
1540 break;
1541 }
1542 }
1543
1544 let ep = found_endpoint.ok_or_else(|| {
1545 AwsServiceError::aws_error(StatusCode::NOT_FOUND, "NotFound", "Endpoint does not exist")
1546 })?;
1547
1548 if !ep.enabled {
1549 return Err(AwsServiceError::aws_error(
1550 StatusCode::BAD_REQUEST,
1551 "EndpointDisabled",
1552 "Endpoint is disabled",
1553 ));
1554 }
1555 drop(state);
1556
1557 let msg_id = uuid::Uuid::new_v4().to_string();
1558 let mut state = self.state.write();
1559 for app in state.platform_applications.values_mut() {
1561 if let Some(ep) = app.endpoints.get_mut(endpoint_arn) {
1562 ep.messages.push(PublishedMessage {
1563 message_id: msg_id.clone(),
1564 topic_arn: endpoint_arn.to_string(),
1565 message: message.to_string(),
1566 subject: None,
1567 message_attributes: message_attributes.clone(),
1568 message_group_id: None,
1569 message_dedup_id: None,
1570 timestamp: Utc::now(),
1571 });
1572 break;
1573 }
1574 }
1575
1576 Ok(xml_resp(
1577 &format!(
1578 r#"<PublishResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
1579 <PublishResult>
1580 <MessageId>{msg_id}</MessageId>
1581 </PublishResult>
1582 <ResponseMetadata>
1583 <RequestId>{request_id}</RequestId>
1584 </ResponseMetadata>
1585</PublishResponse>"#,
1586 ),
1587 request_id,
1588 ))
1589 }
1590
1591 fn list_subscriptions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1592 let state = self.state.read();
1593
1594 let all_subs: Vec<&SnsSubscription> = state.subscriptions.values().collect();
1595 let next_token = param(req, "NextToken")
1596 .and_then(|t| t.parse::<usize>().ok())
1597 .unwrap_or(0);
1598 let next_token = next_token.min(all_subs.len());
1599
1600 let page = &all_subs[next_token..];
1601 let has_more = page.len() > DEFAULT_PAGE_SIZE;
1602 let page = if has_more {
1603 &page[..DEFAULT_PAGE_SIZE]
1604 } else {
1605 page
1606 };
1607
1608 let members: String = page
1609 .iter()
1610 .map(|s| format_sub_member(s))
1611 .collect::<Vec<_>>()
1612 .join("\n");
1613
1614 let next_token_xml = if has_more {
1615 format!(
1616 "\n <NextToken>{}</NextToken>",
1617 next_token + DEFAULT_PAGE_SIZE
1618 )
1619 } else {
1620 String::new()
1621 };
1622
1623 Ok(xml_resp(
1624 &format!(
1625 r#"<ListSubscriptionsResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
1626 <ListSubscriptionsResult>
1627 <Subscriptions>
1628{members}
1629 </Subscriptions>{next_token_xml}
1630 </ListSubscriptionsResult>
1631 <ResponseMetadata>
1632 <RequestId>{}</RequestId>
1633 </ResponseMetadata>
1634</ListSubscriptionsResponse>"#,
1635 req.request_id
1636 ),
1637 &req.request_id,
1638 ))
1639 }
1640
1641 fn list_subscriptions_by_topic(
1642 &self,
1643 req: &AwsRequest,
1644 ) -> Result<AwsResponse, AwsServiceError> {
1645 let topic_arn = required(req, "TopicArn")?;
1646 let state = self.state.read();
1647
1648 let all_subs: Vec<&SnsSubscription> = state
1649 .subscriptions
1650 .values()
1651 .filter(|s| s.topic_arn == topic_arn)
1652 .collect();
1653
1654 let next_token = param(req, "NextToken")
1655 .and_then(|t| t.parse::<usize>().ok())
1656 .unwrap_or(0);
1657 let next_token = next_token.min(all_subs.len());
1658
1659 let page = &all_subs[next_token..];
1660 let has_more = page.len() > DEFAULT_PAGE_SIZE;
1661 let page = if has_more {
1662 &page[..DEFAULT_PAGE_SIZE]
1663 } else {
1664 page
1665 };
1666
1667 let members: String = page
1668 .iter()
1669 .map(|s| format_sub_member(s))
1670 .collect::<Vec<_>>()
1671 .join("\n");
1672
1673 let next_token_xml = if has_more {
1674 format!(
1675 "\n <NextToken>{}</NextToken>",
1676 next_token + DEFAULT_PAGE_SIZE
1677 )
1678 } else {
1679 String::new()
1680 };
1681
1682 Ok(xml_resp(
1683 &format!(
1684 r#"<ListSubscriptionsByTopicResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
1685 <ListSubscriptionsByTopicResult>
1686 <Subscriptions>
1687{members}
1688 </Subscriptions>{next_token_xml}
1689 </ListSubscriptionsByTopicResult>
1690 <ResponseMetadata>
1691 <RequestId>{}</RequestId>
1692 </ResponseMetadata>
1693</ListSubscriptionsByTopicResponse>"#,
1694 req.request_id
1695 ),
1696 &req.request_id,
1697 ))
1698 }
1699
1700 fn get_subscription_attributes(
1701 &self,
1702 req: &AwsRequest,
1703 ) -> Result<AwsResponse, AwsServiceError> {
1704 let sub_arn = required(req, "SubscriptionArn")?;
1705 let state = self.state.read();
1706 let sub = state
1707 .subscriptions
1708 .get(&sub_arn)
1709 .ok_or_else(|| not_found("Subscription"))?;
1710
1711 let mut entries = vec![
1712 format_attr("SubscriptionArn", &sub.subscription_arn),
1713 format_attr("TopicArn", &sub.topic_arn),
1714 format_attr("Protocol", &sub.protocol),
1715 format_attr("Endpoint", &sub.endpoint),
1716 format_attr("Owner", &sub.owner),
1717 format_attr("ConfirmationWasAuthenticated", "true"),
1718 format_attr("PendingConfirmation", "false"),
1719 ];
1720
1721 if !sub.attributes.contains_key("RawMessageDelivery") {
1723 entries.push(format_attr("RawMessageDelivery", "false"));
1724 }
1725
1726 entries.push(format_attr(
1728 "EffectiveDeliveryPolicy",
1729 DEFAULT_EFFECTIVE_DELIVERY_POLICY,
1730 ));
1731
1732 for (k, v) in &sub.attributes {
1733 if k == "FilterPolicy" && v.is_empty() {
1735 continue;
1736 }
1737 if k == "FilterPolicyScope" {
1739 let has_filter = sub
1740 .attributes
1741 .get("FilterPolicy")
1742 .map(|v| !v.is_empty())
1743 .unwrap_or(false);
1744 if !has_filter {
1745 continue;
1746 }
1747 }
1748 entries.push(format_attr(k, v));
1749 }
1750 let attrs = entries.join("\n");
1751
1752 Ok(xml_resp(
1753 &format!(
1754 r#"<GetSubscriptionAttributesResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
1755 <GetSubscriptionAttributesResult>
1756 <Attributes>
1757{attrs}
1758 </Attributes>
1759 </GetSubscriptionAttributesResult>
1760 <ResponseMetadata>
1761 <RequestId>{}</RequestId>
1762 </ResponseMetadata>
1763</GetSubscriptionAttributesResponse>"#,
1764 req.request_id
1765 ),
1766 &req.request_id,
1767 ))
1768 }
1769
1770 fn set_subscription_attributes(
1771 &self,
1772 req: &AwsRequest,
1773 ) -> Result<AwsResponse, AwsServiceError> {
1774 let sub_arn = required(req, "SubscriptionArn")?;
1775 let attr_name = required(req, "AttributeName")?;
1776 let attr_value = param(req, "AttributeValue").unwrap_or_default();
1777
1778 if !VALID_SUBSCRIPTION_ATTRS.contains(&attr_name.as_str()) {
1780 return Err(AwsServiceError::aws_error(
1781 StatusCode::BAD_REQUEST,
1782 "InvalidParameter",
1783 "Invalid parameter: AttributeName".to_string(),
1784 ));
1785 }
1786
1787 if attr_name == "FilterPolicy" && !attr_value.is_empty() {
1789 validate_filter_policy(&attr_value)?;
1790 }
1791
1792 let mut state = self.state.write();
1793 let sub = state
1794 .subscriptions
1795 .get_mut(&sub_arn)
1796 .ok_or_else(|| not_found("Subscription"))?;
1797
1798 sub.attributes.insert(attr_name.clone(), attr_value.clone());
1799
1800 if attr_name == "FilterPolicy" && !attr_value.is_empty() {
1802 sub.attributes
1803 .entry("FilterPolicyScope".to_string())
1804 .or_insert_with(|| "MessageAttributes".to_string());
1805 }
1806
1807 Ok(xml_resp(
1808 &format!(
1809 r#"<SetSubscriptionAttributesResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
1810 <ResponseMetadata>
1811 <RequestId>{}</RequestId>
1812 </ResponseMetadata>
1813</SetSubscriptionAttributesResponse>"#,
1814 req.request_id
1815 ),
1816 &req.request_id,
1817 ))
1818 }
1819
1820 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1821 let resource_arn = required(req, "ResourceArn")?;
1822 let new_tags = parse_tags(req);
1823
1824 let mut state = self.state.write();
1825 let topic = state.topics.get_mut(&resource_arn).ok_or_else(|| {
1826 AwsServiceError::aws_error(
1827 StatusCode::NOT_FOUND,
1828 "ResourceNotFound",
1829 "Resource does not exist",
1830 )
1831 })?;
1832
1833 let mut merged = topic.tags.clone();
1835 for (k, v) in &new_tags {
1836 if let Some(pos) = merged.iter().position(|(ek, _)| ek == k) {
1838 merged[pos] = (k.clone(), v.clone());
1839 } else {
1840 merged.push((k.clone(), v.clone()));
1841 }
1842 }
1843 if merged.len() > 50 {
1844 return Err(AwsServiceError::aws_error(
1845 StatusCode::BAD_REQUEST,
1846 "TagLimitExceeded",
1847 "Could not complete request: tag quota of per resource exceeded",
1848 ));
1849 }
1850
1851 topic.tags = merged;
1852
1853 Ok(xml_resp(
1854 &format!(
1855 r#"<TagResourceResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
1856 <TagResourceResult/>
1857 <ResponseMetadata>
1858 <RequestId>{}</RequestId>
1859 </ResponseMetadata>
1860</TagResourceResponse>"#,
1861 req.request_id
1862 ),
1863 &req.request_id,
1864 ))
1865 }
1866
1867 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1868 let resource_arn = required(req, "ResourceArn")?;
1869 let tag_keys = parse_tag_keys(req);
1870
1871 let mut state = self.state.write();
1872 let topic = state.topics.get_mut(&resource_arn).ok_or_else(|| {
1873 AwsServiceError::aws_error(
1874 StatusCode::NOT_FOUND,
1875 "ResourceNotFound",
1876 "Resource does not exist",
1877 )
1878 })?;
1879 topic.tags.retain(|(k, _)| !tag_keys.contains(k));
1880
1881 Ok(xml_resp(
1882 &format!(
1883 r#"<UntagResourceResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
1884 <UntagResourceResult/>
1885 <ResponseMetadata>
1886 <RequestId>{}</RequestId>
1887 </ResponseMetadata>
1888</UntagResourceResponse>"#,
1889 req.request_id
1890 ),
1891 &req.request_id,
1892 ))
1893 }
1894
1895 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1896 let resource_arn = required(req, "ResourceArn")?;
1897 let state = self.state.read();
1898 let topic = state.topics.get(&resource_arn).ok_or_else(|| {
1899 AwsServiceError::aws_error(
1900 StatusCode::NOT_FOUND,
1901 "ResourceNotFound",
1902 "Resource does not exist",
1903 )
1904 })?;
1905
1906 let members: String = topic
1907 .tags
1908 .iter()
1909 .map(|(k, v)| format!(" <member><Key>{k}</Key><Value>{v}</Value></member>"))
1910 .collect::<Vec<_>>()
1911 .join("\n");
1912
1913 Ok(xml_resp(
1914 &format!(
1915 r#"<ListTagsForResourceResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
1916 <ListTagsForResourceResult>
1917 <Tags>
1918{members}
1919 </Tags>
1920 </ListTagsForResourceResult>
1921 <ResponseMetadata>
1922 <RequestId>{}</RequestId>
1923 </ResponseMetadata>
1924</ListTagsForResourceResponse>"#,
1925 req.request_id
1926 ),
1927 &req.request_id,
1928 ))
1929 }
1930
1931 fn add_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1932 let topic_arn = required(req, "TopicArn")?;
1933 let label = required(req, "Label")?;
1934
1935 let mut account_ids = Vec::new();
1937 for n in 1..=20 {
1938 let key = format!("AWSAccountId.member.{n}");
1939 if let Some(v) = req.query_params.get(&key) {
1940 account_ids.push(v.clone());
1941 } else {
1942 break;
1943 }
1944 }
1945
1946 let mut action_names = Vec::new();
1947 for n in 1..=20 {
1948 let key = format!("ActionName.member.{n}");
1949 if let Some(v) = req.query_params.get(&key) {
1950 action_names.push(v.clone());
1951 } else {
1952 break;
1953 }
1954 }
1955
1956 for action in &action_names {
1958 if !VALID_SNS_ACTIONS.contains(&action.as_str()) {
1959 return Err(AwsServiceError::aws_error(
1960 StatusCode::BAD_REQUEST,
1961 "InvalidParameter",
1962 "Policy statement action out of service scope!",
1963 ));
1964 }
1965 }
1966
1967 let mut state = self.state.write();
1968 let account_id = state.account_id.clone();
1969 let topic = state
1970 .topics
1971 .get_mut(&topic_arn)
1972 .ok_or_else(|| not_found("Topic"))?;
1973
1974 let policy_str = topic
1976 .attributes
1977 .get("Policy")
1978 .cloned()
1979 .unwrap_or_else(|| default_policy(&topic_arn, &account_id));
1980
1981 let mut policy: Value = serde_json::from_str(&policy_str)
1982 .or_else(|_| serde_json::from_str(&default_policy(&topic_arn, &account_id)))
1983 .map_err(|_| {
1984 AwsServiceError::aws_error(
1985 StatusCode::INTERNAL_SERVER_ERROR,
1986 "InternalError",
1987 "Failed to parse topic policy",
1988 )
1989 })?;
1990
1991 if let Some(statements) = policy["Statement"].as_array() {
1993 for stmt in statements {
1994 if stmt["Sid"].as_str() == Some(&label) {
1995 return Err(AwsServiceError::aws_error(
1996 StatusCode::BAD_REQUEST,
1997 "InvalidParameter",
1998 "Statement already exists",
1999 ));
2000 }
2001 }
2002 }
2003
2004 let principal = if account_ids.len() == 1 {
2006 Value::String(Arn::global("iam", &account_ids[0], "root").to_string())
2007 } else {
2008 Value::Array(
2009 account_ids
2010 .iter()
2011 .map(|id| Value::String(Arn::global("iam", id, "root").to_string()))
2012 .collect(),
2013 )
2014 };
2015
2016 let action = if action_names.len() == 1 {
2018 Value::String(format!("SNS:{}", action_names[0]))
2019 } else {
2020 Value::Array(
2021 action_names
2022 .iter()
2023 .map(|a| Value::String(format!("SNS:{}", a)))
2024 .collect(),
2025 )
2026 };
2027
2028 let new_statement = serde_json::json!({
2029 "Sid": label,
2030 "Effect": "Allow",
2031 "Principal": {"AWS": principal},
2032 "Action": action,
2033 "Resource": topic_arn,
2034 });
2035
2036 if let Some(statements) = policy["Statement"].as_array_mut() {
2037 statements.push(new_statement);
2038 }
2039
2040 topic
2041 .attributes
2042 .insert("Policy".to_string(), policy.to_string());
2043
2044 Ok(xml_resp(
2045 &format!(
2046 r#"<AddPermissionResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2047 <ResponseMetadata>
2048 <RequestId>{}</RequestId>
2049 </ResponseMetadata>
2050</AddPermissionResponse>"#,
2051 req.request_id
2052 ),
2053 &req.request_id,
2054 ))
2055 }
2056
2057 fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2058 let topic_arn = required(req, "TopicArn")?;
2059 let label = required(req, "Label")?;
2060
2061 let mut state = self.state.write();
2062 let topic = state
2063 .topics
2064 .get_mut(&topic_arn)
2065 .ok_or_else(|| not_found("Topic"))?;
2066
2067 if let Some(policy_str) = topic.attributes.get("Policy").cloned() {
2068 if let Ok(mut policy) = serde_json::from_str::<Value>(&policy_str) {
2069 if let Some(statements) = policy["Statement"].as_array_mut() {
2070 statements.retain(|s| s["Sid"].as_str() != Some(&label));
2071 }
2072 topic
2073 .attributes
2074 .insert("Policy".to_string(), policy.to_string());
2075 }
2076 }
2077
2078 Ok(xml_resp(
2079 &format!(
2080 r#"<RemovePermissionResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2081 <ResponseMetadata>
2082 <RequestId>{}</RequestId>
2083 </ResponseMetadata>
2084</RemovePermissionResponse>"#,
2085 req.request_id
2086 ),
2087 &req.request_id,
2088 ))
2089 }
2090
2091 fn create_platform_application(
2094 &self,
2095 req: &AwsRequest,
2096 ) -> Result<AwsResponse, AwsServiceError> {
2097 let name = required(req, "Name")?;
2098 let platform = required(req, "Platform")?;
2099 let attributes = parse_entries(req, "Attributes");
2100
2101 let mut state = self.state.write();
2102 let arn = format!(
2103 "arn:aws:sns:{}:{}:app/{}/{}",
2104 req.region, state.account_id, platform, name
2105 );
2106
2107 state.platform_applications.insert(
2108 arn.clone(),
2109 PlatformApplication {
2110 arn: arn.clone(),
2111 name,
2112 platform,
2113 attributes,
2114 endpoints: HashMap::new(),
2115 },
2116 );
2117
2118 Ok(xml_resp(
2119 &format!(
2120 r#"<CreatePlatformApplicationResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2121 <CreatePlatformApplicationResult>
2122 <PlatformApplicationArn>{arn}</PlatformApplicationArn>
2123 </CreatePlatformApplicationResult>
2124 <ResponseMetadata>
2125 <RequestId>{}</RequestId>
2126 </ResponseMetadata>
2127</CreatePlatformApplicationResponse>"#,
2128 req.request_id
2129 ),
2130 &req.request_id,
2131 ))
2132 }
2133
2134 fn delete_platform_application(
2135 &self,
2136 req: &AwsRequest,
2137 ) -> Result<AwsResponse, AwsServiceError> {
2138 let arn = required(req, "PlatformApplicationArn")?;
2139 self.state.write().platform_applications.remove(&arn);
2140
2141 Ok(xml_resp(
2142 &format!(
2143 r#"<DeletePlatformApplicationResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2144 <ResponseMetadata>
2145 <RequestId>{}</RequestId>
2146 </ResponseMetadata>
2147</DeletePlatformApplicationResponse>"#,
2148 req.request_id
2149 ),
2150 &req.request_id,
2151 ))
2152 }
2153
2154 fn get_platform_application_attributes(
2155 &self,
2156 req: &AwsRequest,
2157 ) -> Result<AwsResponse, AwsServiceError> {
2158 let arn = required(req, "PlatformApplicationArn")?;
2159 let state = self.state.read();
2160 let app = state
2161 .platform_applications
2162 .get(&arn)
2163 .ok_or_else(|| not_found("PlatformApplication"))?;
2164
2165 let attrs: String = app
2166 .attributes
2167 .iter()
2168 .map(|(k, v)| format_attr(k, v))
2169 .collect::<Vec<_>>()
2170 .join("\n");
2171
2172 Ok(xml_resp(
2173 &format!(
2174 r#"<GetPlatformApplicationAttributesResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2175 <GetPlatformApplicationAttributesResult>
2176 <Attributes>
2177{attrs}
2178 </Attributes>
2179 </GetPlatformApplicationAttributesResult>
2180 <ResponseMetadata>
2181 <RequestId>{}</RequestId>
2182 </ResponseMetadata>
2183</GetPlatformApplicationAttributesResponse>"#,
2184 req.request_id
2185 ),
2186 &req.request_id,
2187 ))
2188 }
2189
2190 fn set_platform_application_attributes(
2191 &self,
2192 req: &AwsRequest,
2193 ) -> Result<AwsResponse, AwsServiceError> {
2194 let arn = required(req, "PlatformApplicationArn")?;
2195 let new_attrs = parse_entries(req, "Attributes");
2196
2197 let mut state = self.state.write();
2198 let app = state
2199 .platform_applications
2200 .get_mut(&arn)
2201 .ok_or_else(|| not_found("PlatformApplication"))?;
2202
2203 for (k, v) in new_attrs {
2204 app.attributes.insert(k, v);
2205 }
2206
2207 Ok(xml_resp(
2208 &format!(
2209 r#"<SetPlatformApplicationAttributesResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2210 <ResponseMetadata>
2211 <RequestId>{}</RequestId>
2212 </ResponseMetadata>
2213</SetPlatformApplicationAttributesResponse>"#,
2214 req.request_id
2215 ),
2216 &req.request_id,
2217 ))
2218 }
2219
2220 fn list_platform_applications(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2221 let state = self.state.read();
2222
2223 let members: String = state
2224 .platform_applications
2225 .values()
2226 .map(|app| {
2227 let attrs: String = app
2228 .attributes
2229 .iter()
2230 .map(|(k, v)| format_attr(k, v))
2231 .collect::<Vec<_>>()
2232 .join("\n");
2233 format!(
2234 r#" <member>
2235 <PlatformApplicationArn>{}</PlatformApplicationArn>
2236 <Attributes>
2237{attrs}
2238 </Attributes>
2239 </member>"#,
2240 app.arn
2241 )
2242 })
2243 .collect::<Vec<_>>()
2244 .join("\n");
2245
2246 Ok(xml_resp(
2247 &format!(
2248 r#"<ListPlatformApplicationsResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2249 <ListPlatformApplicationsResult>
2250 <PlatformApplications>
2251{members}
2252 </PlatformApplications>
2253 </ListPlatformApplicationsResult>
2254 <ResponseMetadata>
2255 <RequestId>{}</RequestId>
2256 </ResponseMetadata>
2257</ListPlatformApplicationsResponse>"#,
2258 req.request_id
2259 ),
2260 &req.request_id,
2261 ))
2262 }
2263
2264 fn create_platform_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2265 let app_arn = required(req, "PlatformApplicationArn")?;
2266 let token = required(req, "Token")?;
2267 let custom_user_data = param(req, "CustomUserData");
2268 let attrs = parse_entries(req, "Attributes");
2269
2270 let mut state = self.state.write();
2271 let account_id = state.account_id.clone();
2272 let app = state
2273 .platform_applications
2274 .get_mut(&app_arn)
2275 .ok_or_else(|| not_found("PlatformApplication"))?;
2276
2277 for (arn, ep) in &app.endpoints {
2279 if ep.token == token {
2280 let existing_enabled = ep
2282 .attributes
2283 .get("Enabled")
2284 .cloned()
2285 .unwrap_or_else(|| "true".to_string());
2286 let new_enabled = attrs
2287 .get("Enabled")
2288 .cloned()
2289 .unwrap_or_else(|| "true".to_string());
2290 let custom_matches = match (&custom_user_data, ep.attributes.get("CustomUserData"))
2291 {
2292 (Some(new), Some(old)) => new == old,
2293 (None, None) => true,
2294 (None, Some(_)) => true,
2295 _ => false,
2296 };
2297
2298 if existing_enabled == new_enabled && custom_matches {
2299 return Ok(xml_resp(
2301 &format!(
2302 r#"<CreatePlatformEndpointResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2303 <CreatePlatformEndpointResult>
2304 <EndpointArn>{arn}</EndpointArn>
2305 </CreatePlatformEndpointResult>
2306 <ResponseMetadata>
2307 <RequestId>{}</RequestId>
2308 </ResponseMetadata>
2309</CreatePlatformEndpointResponse>"#,
2310 req.request_id
2311 ),
2312 &req.request_id,
2313 ));
2314 } else {
2315 return Err(AwsServiceError::aws_error(
2316 StatusCode::BAD_REQUEST,
2317 "InvalidParameter",
2318 format!("Invalid parameter: Token Reason: Endpoint {} already exists with the same Token, but different attributes.", arn),
2319 ));
2320 }
2321 }
2322 }
2323
2324 let endpoint_id = uuid::Uuid::new_v4().to_string().replace('-', "");
2325 let endpoint_arn = format!(
2326 "arn:aws:sns:{}:{}:endpoint/{}/{}/{}",
2327 req.region, account_id, app.platform, app.name, endpoint_id
2328 );
2329
2330 let mut endpoint_attrs = attrs;
2331 endpoint_attrs
2332 .entry("Enabled".to_string())
2333 .or_insert_with(|| "true".to_string());
2334 endpoint_attrs.insert("Token".to_string(), token.clone());
2335 if let Some(ref ud) = custom_user_data {
2336 endpoint_attrs
2337 .entry("CustomUserData".to_string())
2338 .or_insert_with(|| ud.clone());
2339 }
2340
2341 let enabled = endpoint_attrs
2342 .get("Enabled")
2343 .map(|v| v == "true")
2344 .unwrap_or(true);
2345
2346 app.endpoints.insert(
2347 endpoint_arn.clone(),
2348 PlatformEndpoint {
2349 arn: endpoint_arn.clone(),
2350 token,
2351 attributes: endpoint_attrs,
2352 enabled,
2353 messages: Vec::new(),
2354 },
2355 );
2356
2357 Ok(xml_resp(
2358 &format!(
2359 r#"<CreatePlatformEndpointResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2360 <CreatePlatformEndpointResult>
2361 <EndpointArn>{endpoint_arn}</EndpointArn>
2362 </CreatePlatformEndpointResult>
2363 <ResponseMetadata>
2364 <RequestId>{}</RequestId>
2365 </ResponseMetadata>
2366</CreatePlatformEndpointResponse>"#,
2367 req.request_id
2368 ),
2369 &req.request_id,
2370 ))
2371 }
2372
2373 fn delete_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2374 let endpoint_arn = required(req, "EndpointArn")?;
2375
2376 let mut state = self.state.write();
2377 for app in state.platform_applications.values_mut() {
2378 app.endpoints.remove(&endpoint_arn);
2379 }
2380
2381 Ok(xml_resp(
2382 &format!(
2383 r#"<DeleteEndpointResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2384 <ResponseMetadata>
2385 <RequestId>{}</RequestId>
2386 </ResponseMetadata>
2387</DeleteEndpointResponse>"#,
2388 req.request_id
2389 ),
2390 &req.request_id,
2391 ))
2392 }
2393
2394 fn get_endpoint_attributes(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2395 let endpoint_arn = required(req, "EndpointArn")?;
2396
2397 let state = self.state.read();
2398 for app in state.platform_applications.values() {
2399 if let Some(ep) = app.endpoints.get(&endpoint_arn) {
2400 let attrs: String = ep
2401 .attributes
2402 .iter()
2403 .map(|(k, v)| format_attr(k, v))
2404 .collect::<Vec<_>>()
2405 .join("\n");
2406
2407 return Ok(xml_resp(
2408 &format!(
2409 r#"<GetEndpointAttributesResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2410 <GetEndpointAttributesResult>
2411 <Attributes>
2412{attrs}
2413 </Attributes>
2414 </GetEndpointAttributesResult>
2415 <ResponseMetadata>
2416 <RequestId>{}</RequestId>
2417 </ResponseMetadata>
2418</GetEndpointAttributesResponse>"#,
2419 req.request_id
2420 ),
2421 &req.request_id,
2422 ));
2423 }
2424 }
2425
2426 Err(not_found("Endpoint"))
2427 }
2428
2429 fn set_endpoint_attributes(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2430 let endpoint_arn = required(req, "EndpointArn")?;
2431 let new_attrs = parse_entries(req, "Attributes");
2432
2433 let mut state = self.state.write();
2434 for app in state.platform_applications.values_mut() {
2435 if let Some(ep) = app.endpoints.get_mut(&endpoint_arn) {
2436 for (k, v) in new_attrs {
2437 if k == "Enabled" {
2438 ep.enabled = v == "true";
2439 }
2440 ep.attributes.insert(k, v);
2441 }
2442
2443 return Ok(xml_resp(
2444 &format!(
2445 r#"<SetEndpointAttributesResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2446 <ResponseMetadata>
2447 <RequestId>{}</RequestId>
2448 </ResponseMetadata>
2449</SetEndpointAttributesResponse>"#,
2450 req.request_id
2451 ),
2452 &req.request_id,
2453 ));
2454 }
2455 }
2456
2457 Err(not_found("Endpoint"))
2458 }
2459
2460 fn list_endpoints_by_platform_application(
2461 &self,
2462 req: &AwsRequest,
2463 ) -> Result<AwsResponse, AwsServiceError> {
2464 let app_arn = required(req, "PlatformApplicationArn")?;
2465
2466 let state = self.state.read();
2467 let app = state
2468 .platform_applications
2469 .get(&app_arn)
2470 .ok_or_else(|| not_found("PlatformApplication"))?;
2471
2472 let members: String = app
2473 .endpoints
2474 .values()
2475 .map(|ep| {
2476 let attrs: String = ep
2477 .attributes
2478 .iter()
2479 .map(|(k, v)| format_attr(k, v))
2480 .collect::<Vec<_>>()
2481 .join("\n");
2482 format!(
2483 r#" <member>
2484 <EndpointArn>{}</EndpointArn>
2485 <Attributes>
2486{attrs}
2487 </Attributes>
2488 </member>"#,
2489 ep.arn
2490 )
2491 })
2492 .collect::<Vec<_>>()
2493 .join("\n");
2494
2495 Ok(xml_resp(
2496 &format!(
2497 r#"<ListEndpointsByPlatformApplicationResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2498 <ListEndpointsByPlatformApplicationResult>
2499 <Endpoints>
2500{members}
2501 </Endpoints>
2502 </ListEndpointsByPlatformApplicationResult>
2503 <ResponseMetadata>
2504 <RequestId>{}</RequestId>
2505 </ResponseMetadata>
2506</ListEndpointsByPlatformApplicationResponse>"#,
2507 req.request_id
2508 ),
2509 &req.request_id,
2510 ))
2511 }
2512
2513 fn set_sms_attributes(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2516 let attrs = parse_entries(req, "attributes");
2517
2518 let mut state = self.state.write();
2519 for (k, v) in attrs {
2520 state.sms_attributes.insert(k, v);
2521 }
2522
2523 Ok(xml_resp(
2524 &format!(
2525 r#"<SetSMSAttributesResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2526 <SetSMSAttributesResult/>
2527 <ResponseMetadata>
2528 <RequestId>{}</RequestId>
2529 </ResponseMetadata>
2530</SetSMSAttributesResponse>"#,
2531 req.request_id
2532 ),
2533 &req.request_id,
2534 ))
2535 }
2536
2537 fn get_sms_attributes(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2538 let mut filter_names = Vec::new();
2540 for n in 1..=50 {
2541 let key = format!("attributes.member.{n}");
2542 if let Some(name) = req.query_params.get(&key) {
2543 filter_names.push(name.clone());
2544 } else {
2545 break;
2546 }
2547 }
2548
2549 let state = self.state.read();
2550
2551 let attrs: String = state
2552 .sms_attributes
2553 .iter()
2554 .filter(|(k, _)| filter_names.is_empty() || filter_names.contains(k))
2555 .map(|(k, v)| format_attr(k, v))
2556 .collect::<Vec<_>>()
2557 .join("\n");
2558
2559 Ok(xml_resp(
2560 &format!(
2561 r#"<GetSMSAttributesResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2562 <GetSMSAttributesResult>
2563 <attributes>
2564{attrs}
2565 </attributes>
2566 </GetSMSAttributesResult>
2567 <ResponseMetadata>
2568 <RequestId>{}</RequestId>
2569 </ResponseMetadata>
2570</GetSMSAttributesResponse>"#,
2571 req.request_id
2572 ),
2573 &req.request_id,
2574 ))
2575 }
2576
2577 fn check_if_phone_number_is_opted_out(
2578 &self,
2579 req: &AwsRequest,
2580 ) -> Result<AwsResponse, AwsServiceError> {
2581 let phone_number = required(req, "phoneNumber")?;
2582
2583 let valid = phone_number.starts_with('+')
2585 && phone_number.len() >= 2
2586 && phone_number[1..].chars().all(|c| c.is_ascii_digit());
2587 if !valid {
2588 return Err(AwsServiceError::aws_error(
2589 StatusCode::BAD_REQUEST,
2590 "InvalidParameter",
2591 format!(
2592 "Invalid parameter: PhoneNumber Reason: {phone_number} does not meet the E164 format"
2593 ),
2594 ));
2595 }
2596
2597 let state = self.state.read();
2598 let is_opted_out =
2600 state.opted_out_numbers.contains(&phone_number) || phone_number.ends_with("99");
2601
2602 Ok(xml_resp(
2603 &format!(
2604 r#"<CheckIfPhoneNumberIsOptedOutResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2605 <CheckIfPhoneNumberIsOptedOutResult>
2606 <isOptedOut>{is_opted_out}</isOptedOut>
2607 </CheckIfPhoneNumberIsOptedOutResult>
2608 <ResponseMetadata>
2609 <RequestId>{}</RequestId>
2610 </ResponseMetadata>
2611</CheckIfPhoneNumberIsOptedOutResponse>"#,
2612 req.request_id
2613 ),
2614 &req.request_id,
2615 ))
2616 }
2617
2618 fn list_phone_numbers_opted_out(
2619 &self,
2620 req: &AwsRequest,
2621 ) -> Result<AwsResponse, AwsServiceError> {
2622 let state = self.state.read();
2623 let members: String = state
2624 .opted_out_numbers
2625 .iter()
2626 .map(|n| format!(" <member>{n}</member>"))
2627 .collect::<Vec<_>>()
2628 .join("\n");
2629
2630 Ok(xml_resp(
2631 &format!(
2632 r#"<ListPhoneNumbersOptedOutResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2633 <ListPhoneNumbersOptedOutResult>
2634 <phoneNumbers>
2635{members}
2636 </phoneNumbers>
2637 </ListPhoneNumbersOptedOutResult>
2638 <ResponseMetadata>
2639 <RequestId>{}</RequestId>
2640 </ResponseMetadata>
2641</ListPhoneNumbersOptedOutResponse>"#,
2642 req.request_id
2643 ),
2644 &req.request_id,
2645 ))
2646 }
2647
2648 fn opt_in_phone_number(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2649 let phone_number = required(req, "phoneNumber")?;
2650 let mut state = self.state.write();
2651 state.opted_out_numbers.retain(|n| n != &phone_number);
2652
2653 Ok(xml_resp(
2654 &format!(
2655 r#"<OptInPhoneNumberResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2656 <OptInPhoneNumberResult/>
2657 <ResponseMetadata>
2658 <RequestId>{}</RequestId>
2659 </ResponseMetadata>
2660</OptInPhoneNumberResponse>"#,
2661 req.request_id
2662 ),
2663 &req.request_id,
2664 ))
2665 }
2666}
2667
2668pub(crate) struct SnsLambdaEventInput<'a> {
2670 pub message_id: &'a str,
2671 pub topic_arn: &'a str,
2672 pub subscription_arn: &'a str,
2673 pub message: &'a str,
2674 pub subject: Option<&'a str>,
2675 pub message_attributes: &'a serde_json::Map<String, Value>,
2676 pub timestamp: &'a chrono::DateTime<Utc>,
2677 pub endpoint: &'a str,
2678}
2679
2680pub(crate) fn build_sns_lambda_event(input: &SnsLambdaEventInput<'_>) -> String {
2683 let sns_event = serde_json::json!({
2684 "Records": [{
2685 "EventVersion": "1.0",
2686 "EventSubscriptionArn": input.subscription_arn,
2687 "EventSource": "aws:sns",
2688 "Sns": {
2689 "SignatureVersion": "1",
2690 "Timestamp": input.timestamp.to_rfc3339(),
2691 "Signature": "FAKE_SIGNATURE",
2692 "SigningCertUrl": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-0000000000000000000000.pem",
2693 "MessageId": input.message_id,
2694 "Message": input.message,
2695 "MessageAttributes": input.message_attributes,
2696 "Type": "Notification",
2697 "UnsubscribeUrl": format!("{}/?Action=Unsubscribe&SubscriptionArn={}", input.endpoint, input.subscription_arn),
2698 "TopicArn": input.topic_arn,
2699 "Subject": input.subject.unwrap_or(""),
2700 }
2701 }]
2702 });
2703 sns_event.to_string()
2704}
2705
2706fn build_sns_envelope(
2709 message_id: &str,
2710 topic_arn: &str,
2711 subject: &Option<String>,
2712 message: &str,
2713 message_attributes: &serde_json::Map<String, Value>,
2714 endpoint: &str,
2715) -> String {
2716 let mut map = serde_json::Map::new();
2717 map.insert(
2718 "Type".to_string(),
2719 Value::String("Notification".to_string()),
2720 );
2721 map.insert(
2722 "MessageId".to_string(),
2723 Value::String(message_id.to_string()),
2724 );
2725 map.insert("TopicArn".to_string(), Value::String(topic_arn.to_string()));
2726 if let Some(ref subj) = subject {
2727 map.insert("Subject".to_string(), Value::String(subj.clone()));
2728 }
2729 map.insert("Message".to_string(), Value::String(message.to_string()));
2730 map.insert(
2731 "Timestamp".to_string(),
2732 Value::String(Utc::now().to_rfc3339()),
2733 );
2734 map.insert(
2735 "SignatureVersion".to_string(),
2736 Value::String("1".to_string()),
2737 );
2738 map.insert(
2739 "Signature".to_string(),
2740 Value::String("FAKE_SIGNATURE".to_string()),
2741 );
2742 map.insert(
2743 "SigningCertURL".to_string(),
2744 Value::String("https://sns.us-east-1.amazonaws.com/SimpleNotificationService-0000000000000000000000.pem".to_string()),
2745 );
2746 map.insert(
2747 "UnsubscribeURL".to_string(),
2748 Value::String(format!(
2749 "{}/?Action=Unsubscribe&SubscriptionArn={}",
2750 endpoint, topic_arn
2751 )),
2752 );
2753 if !message_attributes.is_empty() {
2754 map.insert(
2755 "MessageAttributes".to_string(),
2756 Value::Object(message_attributes.clone()),
2757 );
2758 }
2759 Value::Object(map).to_string()
2760}
2761
2762fn format_attr(name: &str, value: &str) -> String {
2763 format!(" <entry><key>{name}</key><value>{value}</value></entry>")
2764}
2765
2766fn format_sub_member(sub: &SnsSubscription) -> String {
2767 let display_arn = if sub.confirmed {
2768 &sub.subscription_arn
2769 } else {
2770 "PendingConfirmation"
2771 };
2772 format!(
2773 r#" <member>
2774 <SubscriptionArn>{}</SubscriptionArn>
2775 <TopicArn>{}</TopicArn>
2776 <Protocol>{}</Protocol>
2777 <Endpoint>{}</Endpoint>
2778 <Owner>{}</Owner>
2779 </member>"#,
2780 display_arn, sub.topic_arn, sub.protocol, sub.endpoint, sub.owner,
2781 )
2782}
2783
2784struct TopicSubscribers {
2791 sqs: Vec<(String, bool)>,
2793 http: Vec<String>,
2794 lambda: Vec<(String, String)>,
2796 email: Vec<String>,
2797 sms: Vec<String>,
2798}
2799
2800struct TopicFanoutContext<'a> {
2803 msg_id: &'a str,
2804 topic_arn: &'a str,
2805 subject: Option<&'a str>,
2806 endpoint: &'a str,
2807 sqs_message: &'a str,
2808 default_message: &'a str,
2809 envelope_attrs: &'a serde_json::Map<String, Value>,
2810 message_attributes: &'a HashMap<String, MessageAttribute>,
2811 message_group_id: Option<&'a str>,
2812 message_dedup_id: Option<&'a str>,
2813}
2814
2815fn collect_topic_subscribers(
2816 state: &crate::state::SnsState,
2817 topic_arn: &str,
2818 message_attributes: &HashMap<String, MessageAttribute>,
2819 message: &str,
2820) -> TopicSubscribers {
2821 let confirmed_for_topic = |s: &&SnsSubscription| {
2822 s.topic_arn == topic_arn
2823 && s.confirmed
2824 && matches_filter_policy(s, message_attributes, message)
2825 };
2826
2827 let sqs = state
2828 .subscriptions
2829 .values()
2830 .filter(|s| s.protocol == "sqs")
2831 .filter(confirmed_for_topic)
2832 .map(|s| {
2833 let raw = s
2834 .attributes
2835 .get("RawMessageDelivery")
2836 .map(|v| v == "true")
2837 .unwrap_or(false);
2838 (s.endpoint.clone(), raw)
2839 })
2840 .collect();
2841
2842 let http = state
2843 .subscriptions
2844 .values()
2845 .filter(|s| s.protocol == "http" || s.protocol == "https")
2846 .filter(confirmed_for_topic)
2847 .map(|s| s.endpoint.clone())
2848 .collect();
2849
2850 let lambda = state
2851 .subscriptions
2852 .values()
2853 .filter(|s| s.protocol == "lambda")
2854 .filter(confirmed_for_topic)
2855 .map(|s| (s.endpoint.clone(), s.subscription_arn.clone()))
2856 .collect();
2857
2858 let email = state
2859 .subscriptions
2860 .values()
2861 .filter(|s| s.protocol == "email" || s.protocol == "email-json")
2862 .filter(confirmed_for_topic)
2863 .map(|s| s.endpoint.clone())
2864 .collect();
2865
2866 let sms = state
2867 .subscriptions
2868 .values()
2869 .filter(|s| s.protocol == "sms")
2870 .filter(confirmed_for_topic)
2871 .map(|s| s.endpoint.clone())
2872 .collect();
2873
2874 TopicSubscribers {
2875 sqs,
2876 http,
2877 lambda,
2878 email,
2879 sms,
2880 }
2881}
2882
2883fn build_envelope_attrs(
2886 message_attributes: &HashMap<String, MessageAttribute>,
2887) -> serde_json::Map<String, Value> {
2888 let mut envelope_attrs = serde_json::Map::new();
2889 for (key, attr) in message_attributes {
2890 let mut attr_obj = serde_json::Map::new();
2891 attr_obj.insert("Type".to_string(), Value::String(attr.data_type.clone()));
2892 if let Some(ref sv) = attr.string_value {
2893 attr_obj.insert("Value".to_string(), Value::String(sv.clone()));
2894 }
2895 if let Some(ref bv) = attr.binary_value {
2896 attr_obj.insert(
2897 "Value".to_string(),
2898 Value::String(base64::engine::general_purpose::STANDARD.encode(bv)),
2899 );
2900 }
2901 envelope_attrs.insert(key.clone(), Value::Object(attr_obj));
2902 }
2903 envelope_attrs
2904}
2905
2906fn parse_message_attributes(req: &AwsRequest) -> HashMap<String, MessageAttribute> {
2907 let mut attrs = HashMap::new();
2908 for n in 1..=10 {
2909 let name_key = format!("MessageAttributes.entry.{n}.Name");
2910 let data_type_key = format!("MessageAttributes.entry.{n}.Value.DataType");
2911 if let (Some(name), Some(data_type)) = (
2912 req.query_params.get(&name_key),
2913 req.query_params.get(&data_type_key),
2914 ) {
2915 let string_value_key = format!("MessageAttributes.entry.{n}.Value.StringValue");
2916 let string_value = req.query_params.get(&string_value_key).cloned();
2917 let binary_value_key = format!("MessageAttributes.entry.{n}.Value.BinaryValue");
2918 let binary_value = req
2919 .query_params
2920 .get(&binary_value_key)
2921 .and_then(|b| base64::engine::general_purpose::STANDARD.decode(b).ok());
2922 attrs.insert(
2923 name.clone(),
2924 MessageAttribute {
2925 data_type: data_type.clone(),
2926 string_value,
2927 binary_value,
2928 },
2929 );
2930 } else {
2931 break;
2932 }
2933 }
2934 attrs
2935}
2936
2937fn parse_batch_message_attributes(
2940 req: &AwsRequest,
2941 member_idx: usize,
2942) -> HashMap<String, MessageAttribute> {
2943 let mut attrs = HashMap::new();
2944 for n in 1..=10 {
2945 let prefix =
2946 format!("PublishBatchRequestEntries.member.{member_idx}.MessageAttributes.entry.{n}");
2947 let name_key = format!("{prefix}.Name");
2948 let data_type_key = format!("{prefix}.Value.DataType");
2949 if let (Some(name), Some(data_type)) = (
2950 req.query_params.get(&name_key),
2951 req.query_params.get(&data_type_key),
2952 ) {
2953 let sv_key = format!("{prefix}.Value.StringValue");
2954 let string_value = req.query_params.get(&sv_key).cloned();
2955 let bv_key = format!("{prefix}.Value.BinaryValue");
2956 let binary_value = req
2957 .query_params
2958 .get(&bv_key)
2959 .and_then(|b| base64::engine::general_purpose::STANDARD.decode(b).ok());
2960 attrs.insert(
2961 name.clone(),
2962 MessageAttribute {
2963 data_type: data_type.clone(),
2964 string_value,
2965 binary_value,
2966 },
2967 );
2968 } else {
2969 break;
2970 }
2971 }
2972 attrs
2973}
2974
2975fn parse_tags(req: &AwsRequest) -> Vec<(String, String)> {
2978 let mut tags = Vec::new();
2979 for n in 1..=100 {
2980 let key_param = format!("Tags.member.{n}.Key");
2981 let val_param = format!("Tags.member.{n}.Value");
2982 if let Some(key) = req.query_params.get(&key_param) {
2983 let value = req
2984 .query_params
2985 .get(&val_param)
2986 .cloned()
2987 .unwrap_or_default();
2988 tags.push((key.clone(), value));
2989 } else {
2990 break;
2991 }
2992 }
2993 tags
2994}
2995
2996fn parse_tag_keys(req: &AwsRequest) -> Vec<String> {
2999 let mut keys = Vec::new();
3000 for n in 1..=50 {
3001 let key_param = format!("TagKeys.member.{n}");
3002 if let Some(key) = req.query_params.get(&key_param) {
3003 keys.push(key.clone());
3004 } else {
3005 break;
3006 }
3007 }
3008 keys
3009}
3010
3011fn parse_entries(req: &AwsRequest, prefix: &str) -> HashMap<String, String> {
3013 let mut attrs = HashMap::new();
3014 for n in 1..=50 {
3015 let key_param = format!("{prefix}.entry.{n}.key");
3016 let val_param = format!("{prefix}.entry.{n}.value");
3017 if let Some(key) = req.query_params.get(&key_param) {
3018 let value = req
3019 .query_params
3020 .get(&val_param)
3021 .cloned()
3022 .unwrap_or_default();
3023 attrs.insert(key.clone(), value);
3024 } else {
3025 break;
3026 }
3027 }
3028 attrs
3029}
3030
3031fn validate_sms_endpoint(endpoint: &str) -> Result<(), AwsServiceError> {
3033 if endpoint.is_empty() {
3035 return Err(AwsServiceError::aws_error(
3036 StatusCode::BAD_REQUEST,
3037 "InvalidParameter",
3038 "Invalid parameter: Endpoint",
3039 ));
3040 }
3041
3042 let stripped = endpoint.strip_prefix('+').unwrap_or(endpoint);
3044 if stripped.is_empty() {
3045 return Err(AwsServiceError::aws_error(
3046 StatusCode::BAD_REQUEST,
3047 "InvalidParameter",
3048 format!("Invalid SMS endpoint: {endpoint}"),
3049 ));
3050 }
3051
3052 if !endpoint.starts_with('+') && !endpoint.starts_with(|c: char| c.is_ascii_digit()) {
3054 return Err(AwsServiceError::aws_error(
3055 StatusCode::BAD_REQUEST,
3056 "InvalidParameter",
3057 format!("Invalid SMS endpoint: {endpoint}"),
3058 ));
3059 }
3060
3061 if endpoint.ends_with('.') || endpoint.ends_with('-') || endpoint.ends_with('/') {
3063 return Err(AwsServiceError::aws_error(
3064 StatusCode::BAD_REQUEST,
3065 "InvalidParameter",
3066 format!("Invalid SMS endpoint: {endpoint}"),
3067 ));
3068 }
3069
3070 let chars: Vec<char> = endpoint.chars().collect();
3072 for i in 0..chars.len() - 1 {
3073 let c = chars[i];
3074 let next = chars[i + 1];
3075 if (c == '-' || c == '/' || c == '.') && (next == '-' || next == '/' || next == '.') {
3076 return Err(AwsServiceError::aws_error(
3077 StatusCode::BAD_REQUEST,
3078 "InvalidParameter",
3079 format!("Invalid SMS endpoint: {endpoint}"),
3080 ));
3081 }
3082 }
3083
3084 for c in stripped.chars() {
3086 if !c.is_ascii_digit() && c != '-' && c != '/' && c != '.' {
3087 return Err(AwsServiceError::aws_error(
3088 StatusCode::BAD_REQUEST,
3089 "InvalidParameter",
3090 format!("Invalid SMS endpoint: {endpoint}"),
3091 ));
3092 }
3093 }
3094
3095 Ok(())
3096}
3097
3098fn matches_filter_policy(
3100 sub: &SnsSubscription,
3101 message_attributes: &HashMap<String, MessageAttribute>,
3102 message_body: &str,
3103) -> bool {
3104 let filter_json = match sub.attributes.get("FilterPolicy") {
3105 Some(fp) if !fp.is_empty() => fp,
3106 _ => return true,
3107 };
3108
3109 let filter: HashMap<String, Value> = match serde_json::from_str(filter_json) {
3110 Ok(f) => f,
3111 Err(_) => return false,
3112 };
3113
3114 let scope = sub
3115 .attributes
3116 .get("FilterPolicyScope")
3117 .map(|s| s.as_str())
3118 .unwrap_or("MessageAttributes");
3119
3120 if scope == "MessageBody" {
3121 return matches_filter_policy_body(&filter, message_body);
3122 }
3123
3124 for (attr_name, allowed_values) in &filter {
3126 if attr_name == "$or" {
3128 if let Some(or_conditions) = allowed_values.as_array() {
3129 let any_match = or_conditions.iter().any(|condition| {
3130 if let Some(cond_obj) = condition.as_object() {
3131 let cond_map: HashMap<String, Value> = cond_obj
3132 .iter()
3133 .map(|(k, v)| (k.clone(), v.clone()))
3134 .collect();
3135 cond_map.iter().all(|(key, vals)| {
3137 if let Some(arr) = vals.as_array() {
3138 if let Some(msg_attr) = message_attributes.get(key) {
3139 let val = msg_attr.string_value.as_deref().unwrap_or("");
3140 check_filter_values(arr, val)
3141 } else {
3142 false
3143 }
3144 } else {
3145 false
3146 }
3147 })
3148 } else {
3149 false
3150 }
3151 });
3152 if !any_match {
3153 return false;
3154 }
3155 continue;
3156 }
3157 }
3158
3159 let allowed = match allowed_values.as_array() {
3160 Some(arr) => arr,
3161 None => continue,
3162 };
3163
3164 let msg_attr = match message_attributes.get(attr_name) {
3165 Some(a) => a,
3166 None => {
3167 let has_exists_false = allowed.iter().any(|v| {
3168 v.as_object()
3169 .and_then(|o| o.get("exists"))
3170 .and_then(|e| e.as_bool())
3171 == Some(false)
3172 });
3173 if has_exists_false {
3174 continue;
3175 }
3176 return false;
3177 }
3178 };
3179
3180 let attr_value = msg_attr.string_value.as_deref().unwrap_or("");
3181 let is_numeric_type = msg_attr.data_type == "Number";
3182
3183 if msg_attr.data_type.starts_with("String.Array") || msg_attr.data_type == "String.Array" {
3185 if let Ok(arr) = serde_json::from_str::<Vec<Value>>(attr_value) {
3186 let any_match = arr.iter().any(|elem| {
3187 let elem_str = match elem {
3188 Value::String(s) => s.clone(),
3189 Value::Number(n) => n.to_string(),
3190 _ => elem.to_string(),
3191 };
3192 check_filter_values(allowed, &elem_str)
3193 });
3194 if !any_match {
3195 return false;
3196 }
3197 continue;
3198 }
3199 }
3200
3201 let matched = check_filter_values_typed(allowed, attr_value, Some(is_numeric_type));
3202 if !matched {
3203 return false;
3204 }
3205 }
3206
3207 true
3208}
3209
3210fn matches_filter_policy_body(filter: &HashMap<String, Value>, message_body: &str) -> bool {
3212 let body: Value = match serde_json::from_str(message_body) {
3213 Ok(v) => v,
3214 Err(_) => return false,
3215 };
3216
3217 matches_filter_policy_nested(filter, &body)
3218}
3219
3220fn matches_filter_policy_nested(filter: &HashMap<String, Value>, body: &Value) -> bool {
3221 let body_obj = match body.as_object() {
3222 Some(o) => o,
3223 None => return false,
3224 };
3225
3226 for (key, filter_value) in filter {
3227 let body_value = match body_obj.get(key) {
3228 Some(v) => v,
3229 None => {
3230 if let Some(arr) = filter_value.as_array() {
3232 let has_exists_false = arr.iter().any(|v| {
3233 v.as_object()
3234 .and_then(|o| o.get("exists"))
3235 .and_then(|e| e.as_bool())
3236 == Some(false)
3237 });
3238 if has_exists_false {
3239 continue;
3240 }
3241 }
3242 return false;
3243 }
3244 };
3245
3246 if let Some(arr) = filter_value.as_array() {
3247 if let Some(body_arr) = body_value.as_array() {
3250 let any_match = body_arr.iter().any(|elem| {
3251 let is_elem_numeric = elem.is_number();
3252 let elem_str = match elem {
3253 Value::String(s) => s.clone(),
3254 Value::Number(n) => n.to_string(),
3255 Value::Bool(b) => b.to_string(),
3256 Value::Null => "null".to_string(),
3257 _ => elem.to_string(),
3258 };
3259 check_filter_values_typed(arr, &elem_str, Some(is_elem_numeric))
3260 });
3261 if !any_match {
3262 return false;
3263 }
3264 } else {
3265 let is_body_numeric = body_value.is_number();
3266 let value_str = match body_value {
3267 Value::String(s) => s.clone(),
3268 Value::Number(n) => n.to_string(),
3269 Value::Bool(b) => b.to_string(),
3270 Value::Null => "null".to_string(),
3271 _ => body_value.to_string(),
3272 };
3273 if !check_filter_values_typed(arr, &value_str, Some(is_body_numeric)) {
3274 return false;
3275 }
3276 }
3277 } else if let Some(nested_filter) = filter_value.as_object() {
3278 let nested_map: HashMap<String, Value> = nested_filter
3280 .iter()
3281 .map(|(k, v)| (k.clone(), v.clone()))
3282 .collect();
3283 if let Some(body_arr) = body_value.as_array() {
3285 let any_match = body_arr
3286 .iter()
3287 .any(|elem| matches_filter_policy_nested(&nested_map, elem));
3288 if !any_match {
3289 return false;
3290 }
3291 } else if !matches_filter_policy_nested(&nested_map, body_value) {
3292 return false;
3293 }
3294 }
3295 }
3296
3297 true
3298}
3299
3300fn check_filter_values(allowed: &[Value], attr_value: &str) -> bool {
3303 check_filter_values_typed(allowed, attr_value, None)
3304}
3305
3306fn check_filter_values_typed(
3309 allowed: &[Value],
3310 attr_value: &str,
3311 is_numeric: Option<bool>,
3312) -> bool {
3313 allowed.iter().any(|v| match v {
3314 Value::String(s) => {
3315 if is_numeric == Some(true) {
3317 false
3318 } else {
3319 s == attr_value
3320 }
3321 }
3322 Value::Number(n) => {
3323 if is_numeric == Some(false) {
3325 return false;
3326 }
3327 if let Ok(attr_num) = attr_value.parse::<f64>() {
3328 if let Some(filter_num) = n.as_f64() {
3329 numbers_equal(attr_num, filter_num)
3330 } else {
3331 false
3332 }
3333 } else {
3334 false
3335 }
3336 }
3337 Value::Bool(_) | Value::Null => false,
3338 Value::Object(obj) => {
3339 if let Some(prefix) = obj.get("prefix").and_then(|v| v.as_str()) {
3340 attr_value.starts_with(prefix)
3341 } else if let Some(suffix) = obj.get("suffix").and_then(|v| v.as_str()) {
3342 attr_value.ends_with(suffix)
3343 } else if let Some(anything_but) = obj.get("anything-but") {
3344 match anything_but {
3345 Value::String(s) => {
3346 if is_numeric == Some(true) {
3348 true
3349 } else {
3350 attr_value != s
3351 }
3352 }
3353 Value::Number(n) => {
3354 if is_numeric == Some(false) {
3356 return true;
3357 }
3358 if let Ok(attr_num) = attr_value.parse::<f64>() {
3359 if let Some(filter_num) = n.as_f64() {
3360 (attr_num - filter_num).abs() >= f64::EPSILON
3361 } else {
3362 true
3363 }
3364 } else {
3365 true
3366 }
3367 }
3368 Value::Array(arr) => {
3369 !arr.iter().any(|av| match av {
3371 Value::String(s) => {
3372 if is_numeric == Some(true) {
3373 false
3374 } else {
3375 s == attr_value
3376 }
3377 }
3378 Value::Number(n) => {
3379 if is_numeric == Some(false) {
3380 return false;
3381 }
3382 if let Ok(attr_num) = attr_value.parse::<f64>() {
3383 if let Some(filter_num) = n.as_f64() {
3384 numbers_equal(attr_num, filter_num)
3385 } else {
3386 false
3387 }
3388 } else {
3389 false
3390 }
3391 }
3392 _ => false,
3393 })
3394 }
3395 Value::Object(inner) => {
3396 if let Some(prefix) = inner.get("prefix").and_then(|v| v.as_str()) {
3398 !attr_value.starts_with(prefix)
3399 } else if let Some(suffix) = inner.get("suffix").and_then(|v| v.as_str()) {
3400 !attr_value.ends_with(suffix)
3401 } else {
3402 false
3403 }
3404 }
3405 _ => false,
3406 }
3407 } else if let Some(numeric_arr) = obj.get("numeric").and_then(|v| v.as_array()) {
3408 let attr_num: f64 = match attr_value.parse() {
3409 Ok(n) => n,
3410 Err(_) => return false,
3411 };
3412 matches_numeric_filter(attr_num, numeric_arr)
3413 } else if let Some(eq_ignore_case) =
3414 obj.get("equals-ignore-case").and_then(|v| v.as_str())
3415 {
3416 attr_value.eq_ignore_ascii_case(eq_ignore_case)
3417 } else {
3418 obj.get("exists")
3420 .and_then(|v| v.as_bool())
3421 .unwrap_or_default()
3422 }
3423 }
3424 _ => false,
3425 })
3426}
3427
3428fn numbers_equal(a: f64, b: f64) -> bool {
3431 (a - b).abs() < 1e-5
3433}
3434
3435fn matches_numeric_filter(value: f64, conditions: &[Value]) -> bool {
3437 let mut i = 0;
3438 while i + 1 < conditions.len() {
3439 let op = match conditions[i].as_str() {
3440 Some(s) => s,
3441 None => return false,
3442 };
3443 let threshold = match conditions[i + 1].as_f64() {
3444 Some(n) => n,
3445 None => return false,
3446 };
3447 let passes = match op {
3448 "=" => numbers_equal(value, threshold),
3449 ">" => value > threshold,
3450 ">=" => value >= threshold,
3451 "<" => value < threshold,
3452 "<=" => value <= threshold,
3453 _ => return false,
3454 };
3455 if !passes {
3456 return false;
3457 }
3458 i += 2;
3459 }
3460 true
3461}
3462
3463fn validate_filter_policy(policy_str: &str) -> Result<(), AwsServiceError> {
3465 let policy: HashMap<String, Value> = serde_json::from_str(policy_str).map_err(|_| {
3466 AwsServiceError::aws_error(
3467 StatusCode::BAD_REQUEST,
3468 "InvalidParameter",
3469 "Invalid parameter: FilterPolicy: failed to parse JSON.",
3470 )
3471 })?;
3472
3473 let mut total_values = 0;
3475 for (key, value) in &policy {
3476 if key.starts_with('$') {
3478 continue;
3479 }
3480 if let Some(arr) = value.as_array() {
3481 total_values += arr.len();
3482 for item in arr {
3483 validate_filter_policy_value(item)?;
3484 }
3485 }
3486 }
3487 if total_values > 150 {
3488 return Err(AwsServiceError::aws_error(
3489 StatusCode::BAD_REQUEST,
3490 "InvalidParameter",
3491 "Invalid parameter: FilterPolicy: Filter policy is too complex",
3492 ));
3493 }
3494
3495 Ok(())
3496}
3497
3498const VALID_FILTER_MATCH_TYPES: &[&str] = &[
3500 "exists",
3501 "prefix",
3502 "suffix",
3503 "anything-but",
3504 "numeric",
3505 "equals-ignore-case",
3506];
3507
3508fn validate_filter_policy_value(value: &Value) -> Result<(), AwsServiceError> {
3510 match value {
3511 Value::String(_) | Value::Bool(_) | Value::Null => Ok(()),
3512 Value::Number(n) => {
3513 if let Some(f) = n.as_f64() {
3515 if f.abs() >= 1_000_000_000.0 {
3516 return Err(AwsServiceError::aws_error(
3517 StatusCode::INTERNAL_SERVER_ERROR,
3518 "InternalError",
3519 format!(
3520 "Invalid parameter: FilterPolicy: Match value {} must be smaller than 1E9",
3521 n
3522 ),
3523 ));
3524 }
3525 }
3526 Ok(())
3527 }
3528 Value::Array(_) => Err(AwsServiceError::aws_error(
3529 StatusCode::BAD_REQUEST,
3530 "InvalidParameter",
3531 "Invalid parameter: FilterPolicy: Match value must be String, number, true, false, or null",
3532 )),
3533 Value::Object(obj) => {
3534 if let Some(exists_val) = obj.get("exists") {
3535 if !exists_val.is_boolean() {
3536 return Err(AwsServiceError::aws_error(
3537 StatusCode::BAD_REQUEST,
3538 "InvalidParameter",
3539 "Invalid parameter: FilterPolicy: exists match pattern must be either true or false.",
3540 ));
3541 }
3542 }
3543 for key in obj.keys() {
3545 if !VALID_FILTER_MATCH_TYPES.contains(&key.as_str()) {
3546 return Err(AwsServiceError::aws_error(
3547 StatusCode::BAD_REQUEST,
3548 "InvalidParameter",
3549 format!(
3550 "Invalid parameter: FilterPolicy: Unrecognized match type {key}"
3551 ),
3552 ));
3553 }
3554 }
3555 if let Some(numeric_val) = obj.get("numeric") {
3557 if let Some(arr) = numeric_val.as_array() {
3558 validate_numeric_filter(arr)?;
3559 }
3560 }
3561 Ok(())
3562 }
3563 }
3564}
3565
3566const VALID_NUMERIC_OPS: &[&str] = &["=", "<", "<=", ">", ">="];
3567const LOWER_OPS: &[&str] = &[">", ">="];
3568const UPPER_OPS: &[&str] = &["<", "<="];
3569
3570fn validate_numeric_filter(arr: &[Value]) -> Result<(), AwsServiceError> {
3571 if arr.is_empty() {
3573 return Err(AwsServiceError::aws_error(
3574 StatusCode::BAD_REQUEST,
3575 "InvalidParameter",
3576 "Invalid parameter: Attributes Reason: FilterPolicy: Invalid member in numeric match: ]\n at ...",
3577 ));
3578 }
3579
3580 let first_op = match arr[0].as_str() {
3582 Some(s) => s,
3583 None => {
3584 return Err(AwsServiceError::aws_error(
3585 StatusCode::BAD_REQUEST,
3586 "InvalidParameter",
3587 format!(
3588 "Invalid parameter: Attributes Reason: FilterPolicy: Invalid member in numeric match: {}\n at ...",
3589 arr[0]
3590 ),
3591 ));
3592 }
3593 };
3594
3595 if !VALID_NUMERIC_OPS.contains(&first_op) {
3597 return Err(AwsServiceError::aws_error(
3598 StatusCode::BAD_REQUEST,
3599 "InvalidParameter",
3600 format!(
3601 "Invalid parameter: Attributes Reason: FilterPolicy: Unrecognized numeric range operator: {first_op}\n at ..."
3602 ),
3603 ));
3604 }
3605
3606 if arr.len() < 2 {
3608 return Err(AwsServiceError::aws_error(
3609 StatusCode::BAD_REQUEST,
3610 "InvalidParameter",
3611 format!(
3612 "Invalid parameter: Attributes Reason: FilterPolicy: Value of {first_op} must be numeric\n at ..."
3613 ),
3614 ));
3615 }
3616
3617 if !arr[1].is_number() {
3619 return Err(AwsServiceError::aws_error(
3620 StatusCode::BAD_REQUEST,
3621 "InvalidParameter",
3622 format!(
3623 "Invalid parameter: Attributes Reason: FilterPolicy: Value of {first_op} must be numeric\n at ..."
3624 ),
3625 ));
3626 }
3627
3628 if let Some(f) = arr[1].as_f64() {
3630 if f.abs() >= 1_000_000_000.0 {
3631 return Err(AwsServiceError::aws_error(
3632 StatusCode::BAD_REQUEST,
3633 "InvalidParameter",
3634 format!(
3635 "Invalid parameter: FilterPolicy: Numeric match value must be smaller than 1E9, got {}",
3636 arr[1]
3637 ),
3638 ));
3639 }
3640 }
3641
3642 if arr.len() == 2 {
3644 return Ok(());
3645 }
3646
3647 if arr.len() > 4 {
3649 return Err(AwsServiceError::aws_error(
3650 StatusCode::BAD_REQUEST,
3651 "InvalidParameter",
3652 "Invalid parameter: Attributes Reason: FilterPolicy: Too many elements in numeric expression\n at ...",
3653 ));
3654 }
3655
3656 if arr.len() < 4 {
3657 if let Some(op2) = arr[2].as_str() {
3659 return Err(AwsServiceError::aws_error(
3660 StatusCode::BAD_REQUEST,
3661 "InvalidParameter",
3662 format!(
3663 "Invalid parameter: Attributes Reason: FilterPolicy: Value of {op2} must be numeric\n at ..."
3664 ),
3665 ));
3666 }
3667 return Err(AwsServiceError::aws_error(
3668 StatusCode::BAD_REQUEST,
3669 "InvalidParameter",
3670 "Invalid parameter: Attributes Reason: FilterPolicy: Too many elements in numeric expression\n at ...",
3671 ));
3672 }
3673
3674 let second_op = match arr[2].as_str() {
3676 Some(s) => s,
3677 None => {
3678 return Err(AwsServiceError::aws_error(
3679 StatusCode::BAD_REQUEST,
3680 "InvalidParameter",
3681 format!(
3682 "Invalid parameter: Attributes Reason: FilterPolicy: Invalid member in numeric match: {}\n at ...",
3683 arr[2]
3684 ),
3685 ));
3686 }
3687 };
3688
3689 if !arr[3].is_number() {
3690 return Err(AwsServiceError::aws_error(
3691 StatusCode::BAD_REQUEST,
3692 "InvalidParameter",
3693 format!(
3694 "Invalid parameter: Attributes Reason: FilterPolicy: Value of {second_op} must be numeric\n at ..."
3695 ),
3696 ));
3697 }
3698
3699 if let Some(f) = arr[3].as_f64() {
3701 if f.abs() >= 1_000_000_000.0 {
3702 return Err(AwsServiceError::aws_error(
3703 StatusCode::BAD_REQUEST,
3704 "InvalidParameter",
3705 format!(
3706 "Invalid parameter: FilterPolicy: Numeric match value must be smaller than 1E9, got {}",
3707 arr[3]
3708 ),
3709 ));
3710 }
3711 }
3712
3713 let first_is_lower = LOWER_OPS.contains(&first_op);
3715 let second_is_upper = UPPER_OPS.contains(&second_op);
3716
3717 if first_is_lower && !second_is_upper {
3718 return Err(AwsServiceError::aws_error(
3719 StatusCode::BAD_REQUEST,
3720 "InvalidParameter",
3721 format!(
3722 "Invalid parameter: Attributes Reason: FilterPolicy: Bad numeric range operator: {second_op}\n at ..."
3723 ),
3724 ));
3725 }
3726
3727 if !first_is_lower {
3728 return Err(AwsServiceError::aws_error(
3729 StatusCode::BAD_REQUEST,
3730 "InvalidParameter",
3731 "Invalid parameter: Attributes Reason: FilterPolicy: Too many elements in numeric expression\n at ...",
3732 ));
3733 }
3734
3735 let bottom = arr[1].as_f64().unwrap_or(0.0);
3737 let top = arr[3].as_f64().unwrap_or(0.0);
3738 if bottom >= top {
3739 return Err(AwsServiceError::aws_error(
3740 StatusCode::BAD_REQUEST,
3741 "InvalidParameter",
3742 "Invalid parameter: Attributes Reason: FilterPolicy: Bottom must be less than top\n at ...",
3743 ));
3744 }
3745
3746 Ok(())
3747}
3748
3749#[cfg(test)]
3750mod tests {
3751 use super::*;
3752
3753 #[test]
3754 fn validate_message_structure_json_rejects_invalid_json() {
3755 let result = validate_message_structure_json("not valid json");
3756 assert!(result.is_err());
3757 let err = result.unwrap_err();
3758 let msg = format!("{err}");
3759 assert!(msg.contains("No JSON message body is parseable"), "{msg}");
3760 }
3761
3762 #[test]
3763 fn validate_message_structure_json_rejects_missing_default_key() {
3764 let result = validate_message_structure_json(r#"{"sqs": "hello"}"#);
3765 assert!(result.is_err());
3766 let err = result.unwrap_err();
3767 let msg = format!("{err}");
3768 assert!(
3769 msg.contains("No default entry in JSON message body"),
3770 "{msg}"
3771 );
3772 }
3773
3774 #[test]
3775 fn validate_message_structure_json_accepts_valid() {
3776 let result =
3777 validate_message_structure_json(r#"{"default": "hello", "sqs": "hello from sqs"}"#);
3778 assert!(result.is_ok());
3779 }
3780
3781 #[test]
3782 fn build_sns_lambda_event_uses_real_subscription_arn() {
3783 let now = Utc::now();
3784 let sub_arn = "arn:aws:sns:us-east-1:123456789012:my-topic:abc-def-123";
3785 let topic_arn = "arn:aws:sns:us-east-1:123456789012:my-topic";
3786 let attrs = serde_json::Map::new();
3787
3788 let payload = build_sns_lambda_event(&SnsLambdaEventInput {
3789 message_id: "msg-001",
3790 topic_arn,
3791 subscription_arn: sub_arn,
3792 message: "hello",
3793 subject: Some("test subject"),
3794 message_attributes: &attrs,
3795 timestamp: &now,
3796 endpoint: "http://localhost:4566",
3797 });
3798
3799 let parsed: Value = serde_json::from_str(&payload).unwrap();
3800 let record = &parsed["Records"][0];
3801 assert_eq!(record["EventSubscriptionArn"], sub_arn);
3802 assert_eq!(record["EventSource"], "aws:sns");
3803 assert_eq!(record["Sns"]["TopicArn"], topic_arn);
3804 assert_eq!(record["Sns"]["Message"], "hello");
3805 assert_eq!(record["Sns"]["Subject"], "test subject");
3806 assert_eq!(record["Sns"]["MessageId"], "msg-001");
3807 let unsub_url = record["Sns"]["UnsubscribeUrl"].as_str().unwrap();
3809 assert!(
3810 unsub_url.contains(sub_arn),
3811 "UnsubscribeUrl should contain subscription ARN"
3812 );
3813 }
3814
3815 #[test]
3816 fn build_sns_envelope_uses_configured_endpoint() {
3817 let endpoint = "http://myhost:5555";
3818 let topic_arn = "arn:aws:sns:us-east-1:123456789012:my-topic";
3819 let attrs = serde_json::Map::new();
3820
3821 let envelope = build_sns_envelope(
3822 "msg-002",
3823 topic_arn,
3824 &None,
3825 "test message",
3826 &attrs,
3827 endpoint,
3828 );
3829
3830 let parsed: Value = serde_json::from_str(&envelope).unwrap();
3831 let unsub_url = parsed["UnsubscribeURL"].as_str().unwrap();
3832 assert!(
3833 unsub_url.starts_with("http://myhost:5555/"),
3834 "UnsubscribeURL should use the configured endpoint, got: {unsub_url}"
3835 );
3836 assert!(
3837 unsub_url.contains(topic_arn),
3838 "UnsubscribeURL should contain topic ARN"
3839 );
3840 }
3841
3842 #[test]
3843 fn build_sns_lambda_event_uses_configured_endpoint() {
3844 let now = Utc::now();
3845 let sub_arn = "arn:aws:sns:us-east-1:123456789012:my-topic:abc-def-123";
3846 let attrs = serde_json::Map::new();
3847 let endpoint = "http://custom:9999";
3848
3849 let payload = build_sns_lambda_event(&SnsLambdaEventInput {
3850 message_id: "msg-003",
3851 topic_arn: "arn:aws:sns:us-east-1:123456789012:my-topic",
3852 subscription_arn: sub_arn,
3853 message: "hello",
3854 subject: None,
3855 message_attributes: &attrs,
3856 timestamp: &now,
3857 endpoint,
3858 });
3859
3860 let parsed: Value = serde_json::from_str(&payload).unwrap();
3861 let unsub_url = parsed["Records"][0]["Sns"]["UnsubscribeUrl"]
3862 .as_str()
3863 .unwrap();
3864 assert!(
3865 unsub_url.starts_with("http://custom:9999/"),
3866 "UnsubscribeUrl should use configured endpoint, got: {unsub_url}"
3867 );
3868 }
3869
3870 #[test]
3871 fn add_permission_with_invalid_policy_returns_error_not_panic() {
3872 use fakecloud_core::delivery::DeliveryBus;
3873 use parking_lot::RwLock;
3874 use std::sync::Arc;
3875
3876 let state = Arc::new(RwLock::new(crate::state::SnsState::new(
3877 "123456789012",
3878 "us-east-1",
3879 "http://localhost:4566",
3880 )));
3881 let delivery = Arc::new(DeliveryBus::new());
3882 let svc = SnsService::new(state.clone(), delivery);
3883
3884 let topic_arn = "arn:aws:sns:us-east-1:123456789012:test-topic";
3886 {
3887 let mut s = state.write();
3888 s.topics.insert(
3889 topic_arn.to_string(),
3890 crate::state::SnsTopic {
3891 topic_arn: topic_arn.to_string(),
3892 name: "test-topic".to_string(),
3893 attributes: {
3894 let mut m = std::collections::HashMap::new();
3895 m.insert("Policy".to_string(), "not valid json {{{".to_string());
3897 m
3898 },
3899 is_fifo: false,
3900 tags: vec![],
3901 created_at: Utc::now(),
3902 },
3903 );
3904 }
3905
3906 let body = format!(
3908 "Action=AddPermission&TopicArn={}&Label=TestLabel&ActionName.member.1=Publish&AWSAccountId.member.1=111111111111",
3909 topic_arn
3910 );
3911 let req = fakecloud_core::service::AwsRequest {
3912 service: "sns".to_string(),
3913 action: "AddPermission".to_string(),
3914 region: "us-east-1".to_string(),
3915 account_id: "123456789012".to_string(),
3916 request_id: "test-req".to_string(),
3917 headers: http::HeaderMap::new(),
3918 query_params: std::collections::HashMap::new(),
3919 body: body.into(),
3920 path_segments: vec![],
3921 raw_path: "/".to_string(),
3922 raw_query: String::new(),
3923 method: http::Method::POST,
3924 is_query_protocol: true,
3925 access_key_id: None,
3926 principal: None,
3927 };
3928
3929 let result = svc.add_permission(&req);
3931 assert!(
3932 result.is_err(),
3933 "Invalid policy JSON should return error, not panic"
3934 );
3935 }
3936
3937 fn make_sns() -> (SnsService, crate::state::SharedSnsState) {
3940 use fakecloud_core::delivery::DeliveryBus;
3941 use parking_lot::RwLock;
3942 use std::sync::Arc;
3943
3944 let state = Arc::new(RwLock::new(crate::state::SnsState::new(
3945 "123456789012",
3946 "us-east-1",
3947 "http://localhost:4566",
3948 )));
3949 let delivery = Arc::new(DeliveryBus::new());
3950 let svc = SnsService::new(state.clone(), delivery);
3951 (svc, state)
3952 }
3953
3954 fn sns_request(action: &str, params: Vec<(&str, &str)>) -> fakecloud_core::service::AwsRequest {
3955 let mut query_params = std::collections::HashMap::new();
3956 query_params.insert("Action".to_string(), action.to_string());
3957 for (k, v) in params {
3958 query_params.insert(k.to_string(), v.to_string());
3959 }
3960 fakecloud_core::service::AwsRequest {
3961 service: "sns".to_string(),
3962 action: action.to_string(),
3963 region: "us-east-1".to_string(),
3964 account_id: "123456789012".to_string(),
3965 request_id: "test-req".to_string(),
3966 headers: http::HeaderMap::new(),
3967 query_params,
3968 body: Default::default(),
3969 path_segments: vec![],
3970 raw_path: "/".to_string(),
3971 raw_query: String::new(),
3972 method: http::Method::POST,
3973 is_query_protocol: true,
3974 access_key_id: None,
3975 principal: None,
3976 }
3977 }
3978
3979 fn assert_ok(result: &Result<AwsResponse, AwsServiceError>) {
3980 assert!(
3981 result.is_ok(),
3982 "Expected Ok, got: {:?}",
3983 result.as_ref().err()
3984 );
3985 }
3986
3987 fn response_body(result: &Result<AwsResponse, AwsServiceError>) -> String {
3988 String::from_utf8(result.as_ref().unwrap().body.expect_bytes().to_vec()).unwrap()
3989 }
3990
3991 #[test]
3994 fn subscribe_creates_subscription() {
3995 let (svc, _state) = make_sns();
3996 let req = sns_request("CreateTopic", vec![("Name", "my-topic")]);
3998 assert_ok(&svc.create_topic(&req));
3999
4000 let topic_arn = "arn:aws:sns:us-east-1:123456789012:my-topic";
4001 let req = sns_request(
4002 "Subscribe",
4003 vec![
4004 ("TopicArn", topic_arn),
4005 ("Protocol", "email"),
4006 ("Endpoint", "user@example.com"),
4007 ],
4008 );
4009 let result = svc.subscribe(&req);
4010 assert_ok(&result);
4011 let body = response_body(&result);
4012 assert!(
4013 body.contains("<SubscriptionArn>"),
4014 "Response should contain SubscriptionArn"
4015 );
4016 assert!(
4017 body.contains(topic_arn),
4018 "SubscriptionArn should contain topic ARN"
4019 );
4020 }
4021
4022 #[test]
4023 fn subscribe_duplicate_returns_existing_arn() {
4024 let (svc, _state) = make_sns();
4025 let req = sns_request("CreateTopic", vec![("Name", "dup-topic")]);
4026 assert_ok(&svc.create_topic(&req));
4027
4028 let topic_arn = "arn:aws:sns:us-east-1:123456789012:dup-topic";
4029 let params = vec![
4030 ("TopicArn", topic_arn),
4031 ("Protocol", "email"),
4032 ("Endpoint", "user@example.com"),
4033 ];
4034 let r1 = svc.subscribe(&sns_request("Subscribe", params.clone()));
4035 assert_ok(&r1);
4036 let body1 = response_body(&r1);
4037
4038 let r2 = svc.subscribe(&sns_request("Subscribe", params));
4039 assert_ok(&r2);
4040 let body2 = response_body(&r2);
4041
4042 assert_eq!(body1, body2, "Duplicate subscribe should return same ARN");
4044 }
4045
4046 #[test]
4047 fn unsubscribe_removes_subscription() {
4048 let (svc, state) = make_sns();
4049 let req = sns_request("CreateTopic", vec![("Name", "unsub-topic")]);
4050 assert_ok(&svc.create_topic(&req));
4051
4052 let topic_arn = "arn:aws:sns:us-east-1:123456789012:unsub-topic";
4053 let req = sns_request(
4054 "Subscribe",
4055 vec![
4056 ("TopicArn", topic_arn),
4057 ("Protocol", "email"),
4058 ("Endpoint", "user@example.com"),
4059 ],
4060 );
4061 assert_ok(&svc.subscribe(&req));
4062
4063 let sub_arn = {
4065 let s = state.read();
4066 s.subscriptions.keys().next().unwrap().clone()
4067 };
4068
4069 let req = sns_request("Unsubscribe", vec![("SubscriptionArn", &sub_arn)]);
4070 assert_ok(&svc.unsubscribe(&req));
4071
4072 let s = state.read();
4073 assert!(s.subscriptions.is_empty(), "Subscription should be removed");
4074 }
4075
4076 #[test]
4077 fn list_subscriptions_returns_all() {
4078 let (svc, _state) = make_sns();
4079 let req = sns_request("CreateTopic", vec![("Name", "list-topic")]);
4080 assert_ok(&svc.create_topic(&req));
4081
4082 let topic_arn = "arn:aws:sns:us-east-1:123456789012:list-topic";
4083 for i in 0..3 {
4084 let email = format!("user{}@example.com", i);
4085 let req = sns_request(
4086 "Subscribe",
4087 vec![
4088 ("TopicArn", topic_arn),
4089 ("Protocol", "email"),
4090 ("Endpoint", &email),
4091 ],
4092 );
4093 assert_ok(&svc.subscribe(&req));
4094 }
4095
4096 let req = sns_request("ListSubscriptions", vec![]);
4097 let result = svc.list_subscriptions(&req);
4098 assert_ok(&result);
4099 let body = response_body(&result);
4100 let count = body.matches("<member>").count();
4102 assert_eq!(count, 3, "Should list 3 subscriptions, found {}", count);
4103 }
4104
4105 #[test]
4106 fn list_subscriptions_by_topic_filters_correctly() {
4107 let (svc, _state) = make_sns();
4108 assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "topicA")])));
4110 assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "topicB")])));
4111
4112 let arn_a = "arn:aws:sns:us-east-1:123456789012:topicA";
4113 let arn_b = "arn:aws:sns:us-east-1:123456789012:topicB";
4114
4115 assert_ok(&svc.subscribe(&sns_request(
4117 "Subscribe",
4118 vec![
4119 ("TopicArn", arn_a),
4120 ("Protocol", "email"),
4121 ("Endpoint", "a1@example.com"),
4122 ],
4123 )));
4124 assert_ok(&svc.subscribe(&sns_request(
4125 "Subscribe",
4126 vec![
4127 ("TopicArn", arn_a),
4128 ("Protocol", "email"),
4129 ("Endpoint", "a2@example.com"),
4130 ],
4131 )));
4132 assert_ok(&svc.subscribe(&sns_request(
4133 "Subscribe",
4134 vec![
4135 ("TopicArn", arn_b),
4136 ("Protocol", "email"),
4137 ("Endpoint", "b1@example.com"),
4138 ],
4139 )));
4140
4141 let req = sns_request("ListSubscriptionsByTopic", vec![("TopicArn", arn_a)]);
4142 let result = svc.list_subscriptions_by_topic(&req);
4143 assert_ok(&result);
4144 let body = response_body(&result);
4145 let count = body.matches("<member>").count();
4146 assert_eq!(
4147 count, 2,
4148 "Topic A should have 2 subscriptions, found {}",
4149 count
4150 );
4151 }
4152
4153 #[test]
4156 fn publish_to_topic_stores_message() {
4157 let (svc, state) = make_sns();
4158 assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "pub-topic")])));
4159
4160 let topic_arn = "arn:aws:sns:us-east-1:123456789012:pub-topic";
4161 let req = sns_request(
4162 "Publish",
4163 vec![
4164 ("TopicArn", topic_arn),
4165 ("Message", "Hello world"),
4166 ("Subject", "Test subject"),
4167 ],
4168 );
4169 let result = svc.publish(&req);
4170 assert_ok(&result);
4171 let body = response_body(&result);
4172 assert!(
4173 body.contains("<MessageId>"),
4174 "Response should contain MessageId"
4175 );
4176
4177 let s = state.read();
4178 assert_eq!(s.published.len(), 1);
4179 assert_eq!(s.published[0].message, "Hello world");
4180 assert_eq!(s.published[0].subject.as_deref(), Some("Test subject"));
4181 }
4182
4183 #[test]
4184 fn publish_to_nonexistent_topic_returns_error() {
4185 let (svc, _state) = make_sns();
4186 let req = sns_request(
4187 "Publish",
4188 vec![
4189 ("TopicArn", "arn:aws:sns:us-east-1:123456789012:nope"),
4190 ("Message", "Hello"),
4191 ],
4192 );
4193 let result = svc.publish(&req);
4194 assert!(result.is_err(), "Publish to nonexistent topic should error");
4195 }
4196
4197 #[test]
4198 fn publish_without_topic_or_phone_returns_error() {
4199 let (svc, _state) = make_sns();
4200 let req = sns_request("Publish", vec![("Message", "Hello")]);
4201 let result = svc.publish(&req);
4202 assert!(result.is_err(), "Publish without target should error");
4203 }
4204
4205 #[test]
4206 fn publish_validates_subject_length() {
4207 let (svc, _state) = make_sns();
4208 assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "subj-topic")])));
4209
4210 let topic_arn = "arn:aws:sns:us-east-1:123456789012:subj-topic";
4211 let long_subject = "x".repeat(101);
4212 let req = sns_request(
4213 "Publish",
4214 vec![
4215 ("TopicArn", topic_arn),
4216 ("Message", "Hello"),
4217 ("Subject", &long_subject),
4218 ],
4219 );
4220 let result = svc.publish(&req);
4221 assert!(result.is_err(), "Subject > 100 chars should error");
4222 }
4223
4224 #[test]
4225 fn publish_to_sms_phone_number() {
4226 let (svc, state) = make_sns();
4227 let req = sns_request(
4228 "Publish",
4229 vec![("PhoneNumber", "+15551234567"), ("Message", "SMS test")],
4230 );
4231 let result = svc.publish(&req);
4232 assert_ok(&result);
4233
4234 let s = state.read();
4235 assert_eq!(s.sms_messages.len(), 1);
4236 assert_eq!(s.sms_messages[0].0, "+15551234567");
4237 assert_eq!(s.sms_messages[0].1, "SMS test");
4238 }
4239
4240 #[test]
4241 fn publish_to_invalid_phone_returns_error() {
4242 let (svc, _state) = make_sns();
4243 let req = sns_request(
4244 "Publish",
4245 vec![("PhoneNumber", "not-a-phone"), ("Message", "SMS test")],
4246 );
4247 let result = svc.publish(&req);
4248 assert!(result.is_err(), "Invalid phone should error");
4249 }
4250
4251 #[test]
4252 fn publish_batch_stores_multiple_messages() {
4253 let (svc, state) = make_sns();
4254 assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "batch-topic")])));
4255
4256 let topic_arn = "arn:aws:sns:us-east-1:123456789012:batch-topic";
4257 let req = sns_request(
4258 "PublishBatch",
4259 vec![
4260 ("TopicArn", topic_arn),
4261 ("PublishBatchRequestEntries.member.1.Id", "msg1"),
4262 ("PublishBatchRequestEntries.member.1.Message", "Hello 1"),
4263 ("PublishBatchRequestEntries.member.2.Id", "msg2"),
4264 ("PublishBatchRequestEntries.member.2.Message", "Hello 2"),
4265 ],
4266 );
4267 let result = svc.publish_batch(&req);
4268 assert_ok(&result);
4269 let body = response_body(&result);
4270 assert!(
4271 body.contains("<Successful>"),
4272 "Response should contain Successful element"
4273 );
4274
4275 let s = state.read();
4276 assert_eq!(s.published.len(), 2);
4277 }
4278
4279 #[test]
4280 fn publish_batch_rejects_duplicate_ids() {
4281 let (svc, _state) = make_sns();
4282 assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "batch-dup")])));
4283
4284 let topic_arn = "arn:aws:sns:us-east-1:123456789012:batch-dup";
4285 let req = sns_request(
4286 "PublishBatch",
4287 vec![
4288 ("TopicArn", topic_arn),
4289 ("PublishBatchRequestEntries.member.1.Id", "same"),
4290 ("PublishBatchRequestEntries.member.1.Message", "Hello 1"),
4291 ("PublishBatchRequestEntries.member.2.Id", "same"),
4292 ("PublishBatchRequestEntries.member.2.Message", "Hello 2"),
4293 ],
4294 );
4295 let result = svc.publish_batch(&req);
4296 assert!(result.is_err(), "Duplicate batch IDs should error");
4297 }
4298
4299 #[test]
4302 fn get_subscription_attributes_returns_defaults() {
4303 let (svc, state) = make_sns();
4304 assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "attr-topic")])));
4305
4306 let topic_arn = "arn:aws:sns:us-east-1:123456789012:attr-topic";
4307 assert_ok(&svc.subscribe(&sns_request(
4308 "Subscribe",
4309 vec![
4310 ("TopicArn", topic_arn),
4311 ("Protocol", "email"),
4312 ("Endpoint", "u@example.com"),
4313 ],
4314 )));
4315
4316 let sub_arn = {
4317 let s = state.read();
4318 s.subscriptions.keys().next().unwrap().clone()
4319 };
4320
4321 let req = sns_request(
4322 "GetSubscriptionAttributes",
4323 vec![("SubscriptionArn", &sub_arn)],
4324 );
4325 let result = svc.get_subscription_attributes(&req);
4326 assert_ok(&result);
4327 let body = response_body(&result);
4328 assert!(
4329 body.contains("<key>Protocol</key>"),
4330 "Should contain Protocol attribute"
4331 );
4332 assert!(
4333 body.contains("<value>email</value>"),
4334 "Protocol should be email"
4335 );
4336 assert!(
4337 body.contains("<key>Endpoint</key>"),
4338 "Should contain Endpoint attribute"
4339 );
4340 assert!(
4341 body.contains("<key>RawMessageDelivery</key>"),
4342 "Should contain RawMessageDelivery"
4343 );
4344 }
4345
4346 #[test]
4347 fn set_subscription_attributes_updates_value() {
4348 let (svc, state) = make_sns();
4349 assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "setattr-topic")])));
4350
4351 let topic_arn = "arn:aws:sns:us-east-1:123456789012:setattr-topic";
4352 assert_ok(&svc.subscribe(&sns_request(
4353 "Subscribe",
4354 vec![
4355 ("TopicArn", topic_arn),
4356 ("Protocol", "email"),
4357 ("Endpoint", "u@example.com"),
4358 ],
4359 )));
4360
4361 let sub_arn = {
4362 let s = state.read();
4363 s.subscriptions.keys().next().unwrap().clone()
4364 };
4365
4366 let req = sns_request(
4368 "SetSubscriptionAttributes",
4369 vec![
4370 ("SubscriptionArn", &sub_arn),
4371 ("AttributeName", "RawMessageDelivery"),
4372 ("AttributeValue", "true"),
4373 ],
4374 );
4375 assert_ok(&svc.set_subscription_attributes(&req));
4376
4377 let s = state.read();
4379 let sub = s.subscriptions.get(&sub_arn).unwrap();
4380 assert_eq!(sub.attributes.get("RawMessageDelivery").unwrap(), "true");
4381 }
4382
4383 #[test]
4384 fn set_subscription_attributes_rejects_invalid_attr() {
4385 let (svc, state) = make_sns();
4386 assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "inv-attr")])));
4387
4388 let topic_arn = "arn:aws:sns:us-east-1:123456789012:inv-attr";
4389 assert_ok(&svc.subscribe(&sns_request(
4390 "Subscribe",
4391 vec![
4392 ("TopicArn", topic_arn),
4393 ("Protocol", "email"),
4394 ("Endpoint", "u@example.com"),
4395 ],
4396 )));
4397
4398 let sub_arn = {
4399 let s = state.read();
4400 s.subscriptions.keys().next().unwrap().clone()
4401 };
4402
4403 let req = sns_request(
4404 "SetSubscriptionAttributes",
4405 vec![
4406 ("SubscriptionArn", &sub_arn),
4407 ("AttributeName", "BogusAttribute"),
4408 ("AttributeValue", "whatever"),
4409 ],
4410 );
4411 let result = svc.set_subscription_attributes(&req);
4412 assert!(result.is_err(), "Invalid attribute name should error");
4413 }
4414
4415 #[test]
4416 fn get_subscription_attributes_nonexistent_returns_error() {
4417 let (svc, _state) = make_sns();
4418 let req = sns_request(
4419 "GetSubscriptionAttributes",
4420 vec![(
4421 "SubscriptionArn",
4422 "arn:aws:sns:us-east-1:123456789012:nope:fake",
4423 )],
4424 );
4425 let result = svc.get_subscription_attributes(&req);
4426 assert!(result.is_err(), "Nonexistent subscription should error");
4427 }
4428
4429 #[test]
4432 fn tag_untag_list_tags_lifecycle() {
4433 let (svc, _state) = make_sns();
4434 assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "tag-topic")])));
4435
4436 let topic_arn = "arn:aws:sns:us-east-1:123456789012:tag-topic";
4437
4438 let req = sns_request(
4440 "TagResource",
4441 vec![
4442 ("ResourceArn", topic_arn),
4443 ("Tags.member.1.Key", "env"),
4444 ("Tags.member.1.Value", "prod"),
4445 ("Tags.member.2.Key", "team"),
4446 ("Tags.member.2.Value", "platform"),
4447 ],
4448 );
4449 assert_ok(&svc.tag_resource(&req));
4450
4451 let req = sns_request("ListTagsForResource", vec![("ResourceArn", topic_arn)]);
4453 let result = svc.list_tags_for_resource(&req);
4454 assert_ok(&result);
4455 let body = response_body(&result);
4456 assert!(
4457 body.contains("<Key>env</Key>"),
4458 "Should contain env tag key"
4459 );
4460 assert!(
4461 body.contains("<Value>prod</Value>"),
4462 "Should contain prod tag value"
4463 );
4464 assert!(
4465 body.contains("<Key>team</Key>"),
4466 "Should contain team tag key"
4467 );
4468
4469 let req = sns_request(
4471 "UntagResource",
4472 vec![("ResourceArn", topic_arn), ("TagKeys.member.1", "env")],
4473 );
4474 assert_ok(&svc.untag_resource(&req));
4475
4476 let req = sns_request("ListTagsForResource", vec![("ResourceArn", topic_arn)]);
4478 let result = svc.list_tags_for_resource(&req);
4479 assert_ok(&result);
4480 let body = response_body(&result);
4481 assert!(
4482 !body.contains("<Key>env</Key>"),
4483 "env tag should be removed"
4484 );
4485 assert!(body.contains("<Key>team</Key>"), "team tag should remain");
4486 }
4487
4488 #[test]
4489 fn tag_resource_nonexistent_returns_error() {
4490 let (svc, _state) = make_sns();
4491 let req = sns_request(
4492 "TagResource",
4493 vec![
4494 ("ResourceArn", "arn:aws:sns:us-east-1:123456789012:nope"),
4495 ("Tags.member.1.Key", "k"),
4496 ("Tags.member.1.Value", "v"),
4497 ],
4498 );
4499 let result = svc.tag_resource(&req);
4500 assert!(result.is_err(), "Tagging nonexistent resource should error");
4501 }
4502
4503 #[test]
4504 fn tag_resource_overwrites_existing_key() {
4505 let (svc, _state) = make_sns();
4506 assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "tag-overwrite")])));
4507
4508 let topic_arn = "arn:aws:sns:us-east-1:123456789012:tag-overwrite";
4509
4510 let req = sns_request(
4512 "TagResource",
4513 vec![
4514 ("ResourceArn", topic_arn),
4515 ("Tags.member.1.Key", "env"),
4516 ("Tags.member.1.Value", "dev"),
4517 ],
4518 );
4519 assert_ok(&svc.tag_resource(&req));
4520
4521 let req = sns_request(
4523 "TagResource",
4524 vec![
4525 ("ResourceArn", topic_arn),
4526 ("Tags.member.1.Key", "env"),
4527 ("Tags.member.1.Value", "prod"),
4528 ],
4529 );
4530 assert_ok(&svc.tag_resource(&req));
4531
4532 let req = sns_request("ListTagsForResource", vec![("ResourceArn", topic_arn)]);
4534 let body = response_body(&svc.list_tags_for_resource(&req));
4535 assert!(
4536 body.contains("<Value>prod</Value>"),
4537 "Tag value should be overwritten to prod"
4538 );
4539 assert_eq!(
4541 body.matches("<member>").count(),
4542 1,
4543 "Should have exactly 1 tag"
4544 );
4545 }
4546
4547 #[test]
4550 fn set_and_get_topic_attributes() {
4551 let (svc, _state) = make_sns();
4552 assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "attr-topic2")])));
4553
4554 let topic_arn = "arn:aws:sns:us-east-1:123456789012:attr-topic2";
4555
4556 let req = sns_request(
4558 "SetTopicAttributes",
4559 vec![
4560 ("TopicArn", topic_arn),
4561 ("AttributeName", "DisplayName"),
4562 ("AttributeValue", "My Nice Topic"),
4563 ],
4564 );
4565 assert_ok(&svc.set_topic_attributes(&req));
4566
4567 let req = sns_request("GetTopicAttributes", vec![("TopicArn", topic_arn)]);
4569 let result = svc.get_topic_attributes(&req);
4570 assert_ok(&result);
4571 let body = response_body(&result);
4572 assert!(
4573 body.contains("<value>My Nice Topic</value>"),
4574 "DisplayName should be set"
4575 );
4576 assert!(
4577 body.contains("<key>TopicArn</key>"),
4578 "Should contain TopicArn"
4579 );
4580 assert!(body.contains("<key>Owner</key>"), "Should contain Owner");
4581 }
4582
4583 #[test]
4584 fn get_topic_attributes_nonexistent_returns_error() {
4585 let (svc, _state) = make_sns();
4586 let req = sns_request(
4587 "GetTopicAttributes",
4588 vec![("TopicArn", "arn:aws:sns:us-east-1:123456789012:nope")],
4589 );
4590 let result = svc.get_topic_attributes(&req);
4591 assert!(result.is_err(), "Nonexistent topic should error");
4592 }
4593
4594 #[test]
4595 fn get_topic_attributes_wrong_region_returns_error() {
4596 let (svc, _state) = make_sns();
4597 assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "region-topic")])));
4598
4599 let req = sns_request(
4601 "GetTopicAttributes",
4602 vec![(
4603 "TopicArn",
4604 "arn:aws:sns:eu-west-1:123456789012:region-topic",
4605 )],
4606 );
4607 let result = svc.get_topic_attributes(&req);
4608 assert!(result.is_err(), "Topic in wrong region should error");
4609 }
4610
4611 #[test]
4614 fn confirm_subscription_returns_arn() {
4615 let (svc, state) = make_sns();
4616 assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "confirm-topic")])));
4617
4618 let topic_arn = "arn:aws:sns:us-east-1:123456789012:confirm-topic";
4619
4620 assert_ok(&svc.subscribe(&sns_request(
4622 "Subscribe",
4623 vec![
4624 ("TopicArn", topic_arn),
4625 ("Protocol", "http"),
4626 ("Endpoint", "http://example.com/hook"),
4627 ],
4628 )));
4629
4630 let token = {
4632 let s = state.read();
4633 s.subscriptions
4634 .values()
4635 .find(|sub| sub.topic_arn == topic_arn && !sub.confirmed)
4636 .expect("should have a pending subscription")
4637 .confirmation_token
4638 .clone()
4639 .expect("pending subscription should have a token")
4640 };
4641
4642 let req = sns_request(
4643 "ConfirmSubscription",
4644 vec![("TopicArn", topic_arn), ("Token", &token)],
4645 );
4646 let result = svc.confirm_subscription(&req);
4647 assert_ok(&result);
4648 let body = response_body(&result);
4649 assert!(
4650 body.contains("<SubscriptionArn>"),
4651 "Should return a SubscriptionArn"
4652 );
4653
4654 let s = state.read();
4656 let sub = s
4657 .subscriptions
4658 .values()
4659 .find(|sub| sub.topic_arn == topic_arn)
4660 .expect("subscription should exist");
4661 assert!(sub.confirmed, "subscription should be confirmed");
4662 }
4663
4664 #[test]
4665 fn confirm_subscription_rejects_invalid_token() {
4666 let (svc, _state) = make_sns();
4667 assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "confirm-topic")])));
4668
4669 let topic_arn = "arn:aws:sns:us-east-1:123456789012:confirm-topic";
4670
4671 assert_ok(&svc.subscribe(&sns_request(
4673 "Subscribe",
4674 vec![
4675 ("TopicArn", topic_arn),
4676 ("Protocol", "http"),
4677 ("Endpoint", "http://example.com/hook"),
4678 ],
4679 )));
4680
4681 let req = sns_request(
4683 "ConfirmSubscription",
4684 vec![("TopicArn", topic_arn), ("Token", "wrong-token")],
4685 );
4686 let result = svc.confirm_subscription(&req);
4687 assert!(result.is_err(), "Should reject invalid token");
4688 }
4689
4690 #[test]
4691 fn confirm_subscription_matches_correct_pending_sub() {
4692 let (svc, state) = make_sns();
4693 assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "multi-topic")])));
4694
4695 let topic_arn = "arn:aws:sns:us-east-1:123456789012:multi-topic";
4696
4697 assert_ok(&svc.subscribe(&sns_request(
4699 "Subscribe",
4700 vec![
4701 ("TopicArn", topic_arn),
4702 ("Protocol", "http"),
4703 ("Endpoint", "http://first.example.com/hook"),
4704 ],
4705 )));
4706 assert_ok(&svc.subscribe(&sns_request(
4707 "Subscribe",
4708 vec![
4709 ("TopicArn", topic_arn),
4710 ("Protocol", "http"),
4711 ("Endpoint", "http://second.example.com/hook"),
4712 ],
4713 )));
4714
4715 let (second_arn, second_token) = {
4717 let s = state.read();
4718 let sub = s
4719 .subscriptions
4720 .values()
4721 .find(|sub| sub.endpoint == "http://second.example.com/hook")
4722 .expect("should have second subscription");
4723 (
4724 sub.subscription_arn.clone(),
4725 sub.confirmation_token.clone().unwrap(),
4726 )
4727 };
4728
4729 let req = sns_request(
4731 "ConfirmSubscription",
4732 vec![("TopicArn", topic_arn), ("Token", &second_token)],
4733 );
4734 let result = svc.confirm_subscription(&req);
4735 assert_ok(&result);
4736 let body = response_body(&result);
4737 assert!(
4738 body.contains(&second_arn),
4739 "Should return the second subscription's ARN"
4740 );
4741
4742 let s = state.read();
4744 for sub in s.subscriptions.values() {
4745 if sub.endpoint == "http://second.example.com/hook" {
4746 assert!(sub.confirmed, "second subscription should be confirmed");
4747 } else {
4748 assert!(!sub.confirmed, "first subscription should still be pending");
4749 }
4750 }
4751 }
4752
4753 #[test]
4754 fn confirm_subscription_accepts_sub_arn_as_token() {
4755 let (svc, state) = make_sns();
4756 assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "arn-token")])));
4757
4758 let topic_arn = "arn:aws:sns:us-east-1:123456789012:arn-token";
4759
4760 assert_ok(&svc.subscribe(&sns_request(
4762 "Subscribe",
4763 vec![
4764 ("TopicArn", topic_arn),
4765 ("Protocol", "http"),
4766 ("Endpoint", "http://example.com/hook"),
4767 ],
4768 )));
4769
4770 let sub_arn = {
4772 let s = state.read();
4773 s.subscriptions
4774 .values()
4775 .find(|sub| sub.topic_arn == topic_arn)
4776 .expect("should have a subscription")
4777 .subscription_arn
4778 .clone()
4779 };
4780
4781 let req = sns_request(
4783 "ConfirmSubscription",
4784 vec![("TopicArn", topic_arn), ("Token", &sub_arn)],
4785 );
4786 let result = svc.confirm_subscription(&req);
4787 assert_ok(&result);
4788
4789 let s = state.read();
4791 let sub = s
4792 .subscriptions
4793 .values()
4794 .find(|sub| sub.topic_arn == topic_arn)
4795 .expect("subscription should exist");
4796 assert!(sub.confirmed, "subscription should be confirmed");
4797 }
4798
4799 #[test]
4802 fn create_delete_list_topics_lifecycle() {
4803 let (svc, _state) = make_sns();
4804 assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "topic1")])));
4806 assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "topic2")])));
4807
4808 let req = sns_request("ListTopics", vec![]);
4810 let body = response_body(&svc.list_topics(&req));
4811 assert_eq!(body.matches("<TopicArn>").count(), 2);
4812
4813 let req = sns_request(
4815 "DeleteTopic",
4816 vec![("TopicArn", "arn:aws:sns:us-east-1:123456789012:topic1")],
4817 );
4818 assert_ok(&svc.delete_topic(&req));
4819
4820 let req = sns_request("ListTopics", vec![]);
4822 let body = response_body(&svc.list_topics(&req));
4823 assert_eq!(body.matches("<TopicArn>").count(), 1);
4824 assert!(body.contains("topic2"), "topic2 should remain");
4825 }
4826
4827 #[test]
4828 fn create_topic_idempotent() {
4829 let (svc, _state) = make_sns();
4830 let r1 = svc.create_topic(&sns_request("CreateTopic", vec![("Name", "idem-topic")]));
4831 assert_ok(&r1);
4832 let r2 = svc.create_topic(&sns_request("CreateTopic", vec![("Name", "idem-topic")]));
4833 assert_ok(&r2);
4834 let body1 = response_body(&r1);
4835 let body2 = response_body(&r2);
4836 assert_eq!(
4837 body1, body2,
4838 "Creating same topic twice should be idempotent"
4839 );
4840 }
4841
4842 #[test]
4845 fn add_and_remove_permission() {
4846 let (svc, state) = make_sns();
4847 assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "perm-topic")])));
4848
4849 let topic_arn = "arn:aws:sns:us-east-1:123456789012:perm-topic";
4850 let req = sns_request(
4851 "AddPermission",
4852 vec![
4853 ("TopicArn", topic_arn),
4854 ("Label", "MyPermission"),
4855 ("AWSAccountId.member.1", "111111111111"),
4856 ("ActionName.member.1", "Publish"),
4857 ],
4858 );
4859 assert_ok(&svc.add_permission(&req));
4860
4861 {
4863 let s = state.read();
4864 let policy_str = s
4865 .topics
4866 .get(topic_arn)
4867 .unwrap()
4868 .attributes
4869 .get("Policy")
4870 .unwrap();
4871 let policy: Value = serde_json::from_str(policy_str).unwrap();
4872 let stmts = policy["Statement"].as_array().unwrap();
4873 assert!(
4874 stmts
4875 .iter()
4876 .any(|s| s["Sid"].as_str() == Some("MyPermission")),
4877 "Policy should contain MyPermission statement"
4878 );
4879 }
4880
4881 let req = sns_request(
4883 "RemovePermission",
4884 vec![("TopicArn", topic_arn), ("Label", "MyPermission")],
4885 );
4886 assert_ok(&svc.remove_permission(&req));
4887
4888 {
4890 let s = state.read();
4891 let policy_str = s
4892 .topics
4893 .get(topic_arn)
4894 .unwrap()
4895 .attributes
4896 .get("Policy")
4897 .unwrap();
4898 let policy: Value = serde_json::from_str(policy_str).unwrap();
4899 let stmts = policy["Statement"].as_array().unwrap();
4900 assert!(
4901 !stmts
4902 .iter()
4903 .any(|s| s["Sid"].as_str() == Some("MyPermission")),
4904 "MyPermission should be removed"
4905 );
4906 }
4907 }
4908
4909 #[test]
4912 fn publish_to_fifo_topic_requires_group_id() {
4913 let (svc, _state) = make_sns();
4914 let mut req = sns_request("CreateTopic", vec![("Name", "fifo-topic.fifo")]);
4915 req.query_params.insert(
4916 "Attributes.entry.1.key".to_string(),
4917 "FifoTopic".to_string(),
4918 );
4919 req.query_params
4920 .insert("Attributes.entry.1.value".to_string(), "true".to_string());
4921 assert_ok(&svc.create_topic(&req));
4922
4923 let topic_arn = "arn:aws:sns:us-east-1:123456789012:fifo-topic.fifo";
4924 let req = sns_request(
4926 "Publish",
4927 vec![("TopicArn", topic_arn), ("Message", "Hello")],
4928 );
4929 let result = svc.publish(&req);
4930 assert!(
4931 result.is_err(),
4932 "FIFO publish without MessageGroupId should error"
4933 );
4934 }
4935
4936 #[test]
4939 fn set_and_get_sms_attributes() {
4940 let (svc, _state) = make_sns();
4941
4942 let mut req = sns_request("SetSMSAttributes", vec![]);
4943 req.query_params.insert(
4944 "attributes.entry.1.key".to_string(),
4945 "DefaultSMSType".to_string(),
4946 );
4947 req.query_params.insert(
4948 "attributes.entry.1.value".to_string(),
4949 "Transactional".to_string(),
4950 );
4951 assert_ok(&svc.set_sms_attributes(&req));
4952
4953 let req = sns_request("GetSMSAttributes", vec![]);
4954 let result = svc.get_sms_attributes(&req);
4955 assert_ok(&result);
4956 let body = response_body(&result);
4957 assert!(
4958 body.contains("DefaultSMSType"),
4959 "Should contain set SMS attribute"
4960 );
4961 }
4962
4963 #[test]
4966 fn check_phone_opted_out() {
4967 let (svc, state) = make_sns();
4968 state.write().seed_default_opted_out();
4969
4970 let req = sns_request(
4971 "CheckIfPhoneNumberIsOptedOut",
4972 vec![("phoneNumber", "+15005550099")],
4973 );
4974 let result = svc.check_if_phone_number_is_opted_out(&req);
4975 assert_ok(&result);
4976 let body = response_body(&result);
4977 assert!(
4978 body.contains("<isOptedOut>true</isOptedOut>"),
4979 "Seeded number should be opted out"
4980 );
4981 }
4982
4983 #[test]
4984 fn list_phone_numbers_opted_out() {
4985 let (svc, state) = make_sns();
4986 state.write().seed_default_opted_out();
4987
4988 let req = sns_request("ListPhoneNumbersOptedOut", vec![]);
4989 let result = svc.list_phone_numbers_opted_out(&req);
4990 assert_ok(&result);
4991 let body = response_body(&result);
4992 assert!(
4993 body.contains("+15005550099"),
4994 "Should list seeded opted-out number"
4995 );
4996 }
4997
4998 #[test]
4999 fn opt_in_phone_number() {
5000 let (svc, state) = make_sns();
5001 state.write().seed_default_opted_out();
5002
5003 let req = sns_request("OptInPhoneNumber", vec![("phoneNumber", "+15005550099")]);
5004 assert_ok(&svc.opt_in_phone_number(&req));
5005
5006 let s = state.read();
5008 assert!(
5009 !s.opted_out_numbers.contains(&"+15005550099".to_string()),
5010 "Phone should no longer be opted out"
5011 );
5012 }
5013
5014 #[test]
5017 fn delete_topic_removes_subscriptions() {
5018 let (svc, state) = make_sns();
5019 assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "del-sub-topic")])));
5020 let topic_arn = "arn:aws:sns:us-east-1:123456789012:del-sub-topic";
5021 assert_ok(&svc.subscribe(&sns_request(
5022 "Subscribe",
5023 vec![
5024 ("TopicArn", topic_arn),
5025 ("Protocol", "email"),
5026 ("Endpoint", "u@example.com"),
5027 ],
5028 )));
5029
5030 assert_eq!(state.read().subscriptions.len(), 1);
5031
5032 assert_ok(&svc.delete_topic(&sns_request("DeleteTopic", vec![("TopicArn", topic_arn)])));
5033 assert_eq!(
5034 state.read().subscriptions.len(),
5035 0,
5036 "Subscriptions should be removed with topic"
5037 );
5038 }
5039
5040 #[test]
5041 fn malformed_filter_policy_does_not_match() {
5042 let sub = SnsSubscription {
5043 subscription_arn: "arn:aws:sns:us-east-1:123456789012:t:sub-1".to_string(),
5044 topic_arn: "arn:aws:sns:us-east-1:123456789012:t".to_string(),
5045 protocol: "sqs".to_string(),
5046 endpoint: "arn:aws:sqs:us-east-1:123456789012:q".to_string(),
5047 owner: "123456789012".to_string(),
5048 attributes: HashMap::from([(
5049 "FilterPolicy".to_string(),
5050 "not valid json {{[".to_string(),
5051 )]),
5052 confirmed: true,
5053 confirmation_token: None,
5054 };
5055 let attrs = HashMap::new();
5056 assert!(
5057 !matches_filter_policy(&sub, &attrs, "hello"),
5058 "malformed FilterPolicy JSON must not match (fail closed)"
5059 );
5060 }
5061}