Skip to main content

fakecloud_sns/
service.rs

1use async_trait::async_trait;
2use base64::Engine;
3use chrono::Utc;
4use http::StatusCode;
5use serde_json::Value;
6use std::collections::HashMap;
7
8use fakecloud_aws::arn::Arn;
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11
12use crate::state::{
13    MessageAttribute, PlatformApplication, PlatformEndpoint, PublishedMessage, SharedSnsState,
14    SnsSubscription, SnsTopic,
15};
16
17pub struct SnsService {
18    state: SharedSnsState,
19    delivery: Arc<DeliveryBus>,
20}
21
22impl SnsService {
23    pub fn new(state: SharedSnsState, delivery: Arc<DeliveryBus>) -> Self {
24        Self { state, delivery }
25    }
26}
27
28use std::sync::Arc;
29
30const DEFAULT_PAGE_SIZE: usize = 100;
31
32const DEFAULT_EFFECTIVE_DELIVERY_POLICY: &str = r#"{"defaultHealthyRetryPolicy":{"numNoDelayRetries":0,"numMinDelayRetries":0,"minDelayTarget":20,"maxDelayTarget":20,"numMaxDelayRetries":0,"numRetries":3,"backoffFunction":"linear"},"sicklyRetryPolicy":null,"throttlePolicy":null,"guaranteed":false}"#;
33
34fn default_policy(topic_arn: &str, account_id: &str) -> String {
35    serde_json::json!({
36        "Version": "2008-10-17",
37        "Id": "__default_policy_ID",
38        "Statement": [{
39            "Effect": "Allow",
40            "Sid": "__default_statement_ID",
41            "Principal": {"AWS": "*"},
42            "Action": [
43                "SNS:GetTopicAttributes",
44                "SNS:SetTopicAttributes",
45                "SNS:AddPermission",
46                "SNS:RemovePermission",
47                "SNS:DeleteTopic",
48                "SNS:Subscribe",
49                "SNS:ListSubscriptionsByTopic",
50                "SNS:Publish",
51            ],
52            "Resource": topic_arn,
53            "Condition": {"StringEquals": {"AWS:SourceOwner": account_id}},
54        }]
55    })
56    .to_string()
57}
58
59const VALID_SNS_ACTIONS: &[&str] = &[
60    "GetTopicAttributes",
61    "SetTopicAttributes",
62    "AddPermission",
63    "RemovePermission",
64    "DeleteTopic",
65    "Subscribe",
66    "ListSubscriptionsByTopic",
67    "Publish",
68    "Receive",
69];
70
71const VALID_SUBSCRIPTION_ATTRS: &[&str] = &[
72    "RawMessageDelivery",
73    "DeliveryPolicy",
74    "FilterPolicy",
75    "FilterPolicyScope",
76    "RedrivePolicy",
77    "SubscriptionRoleArn",
78];
79
80#[async_trait]
81impl AwsService for SnsService {
82    fn service_name(&self) -> &str {
83        "sns"
84    }
85
86    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
87        match req.action.as_str() {
88            "CreateTopic" => self.create_topic(&req),
89            "DeleteTopic" => self.delete_topic(&req),
90            "ListTopics" => self.list_topics(&req),
91            "GetTopicAttributes" => self.get_topic_attributes(&req),
92            "SetTopicAttributes" => self.set_topic_attributes(&req),
93            "Subscribe" => self.subscribe(&req),
94            "ConfirmSubscription" => self.confirm_subscription(&req),
95            "Unsubscribe" => self.unsubscribe(&req),
96            "Publish" => self.publish(&req),
97            "PublishBatch" => self.publish_batch(&req),
98            "ListSubscriptions" => self.list_subscriptions(&req),
99            "ListSubscriptionsByTopic" => self.list_subscriptions_by_topic(&req),
100            "GetSubscriptionAttributes" => self.get_subscription_attributes(&req),
101            "SetSubscriptionAttributes" => self.set_subscription_attributes(&req),
102            "TagResource" => self.tag_resource(&req),
103            "UntagResource" => self.untag_resource(&req),
104            "ListTagsForResource" => self.list_tags_for_resource(&req),
105            "AddPermission" => self.add_permission(&req),
106            "RemovePermission" => self.remove_permission(&req),
107            // Platform application actions
108            "CreatePlatformApplication" => self.create_platform_application(&req),
109            "DeletePlatformApplication" => self.delete_platform_application(&req),
110            "GetPlatformApplicationAttributes" => self.get_platform_application_attributes(&req),
111            "SetPlatformApplicationAttributes" => self.set_platform_application_attributes(&req),
112            "ListPlatformApplications" => self.list_platform_applications(&req),
113            "CreatePlatformEndpoint" => self.create_platform_endpoint(&req),
114            "DeleteEndpoint" => self.delete_endpoint(&req),
115            "GetEndpointAttributes" => self.get_endpoint_attributes(&req),
116            "SetEndpointAttributes" => self.set_endpoint_attributes(&req),
117            "ListEndpointsByPlatformApplication" => {
118                self.list_endpoints_by_platform_application(&req)
119            }
120            // SMS actions
121            "SetSMSAttributes" => self.set_sms_attributes(&req),
122            "GetSMSAttributes" => self.get_sms_attributes(&req),
123            "CheckIfPhoneNumberIsOptedOut" => self.check_if_phone_number_is_opted_out(&req),
124            "ListPhoneNumbersOptedOut" => self.list_phone_numbers_opted_out(&req),
125            "OptInPhoneNumber" => self.opt_in_phone_number(&req),
126            _ => Err(AwsServiceError::action_not_implemented("sns", &req.action)),
127        }
128    }
129
130    fn supported_actions(&self) -> &[&str] {
131        SNS_SUPPORTED_ACTIONS
132    }
133
134    fn iam_enforceable(&self) -> bool {
135        true
136    }
137
138    /// SNS resources are topic / subscription / platform-app / endpoint
139    /// ARNs. Topic-targeted actions carry a `TopicArn`; subscription
140    /// actions carry a `SubscriptionArn`; platform-app actions carry
141    /// `PlatformApplicationArn`; endpoint actions carry `EndpointArn`.
142    /// Listing and account-scoped actions target `*`.
143    fn iam_action_for(&self, request: &AwsRequest) -> Option<fakecloud_core::auth::IamAction> {
144        let action = SNS_SUPPORTED_ACTIONS
145            .iter()
146            .copied()
147            .find(|a| *a == request.action)?;
148        let resource = match action {
149            "CreateTopic" => {
150                // The to-be-created topic ARN is built from the same
151                // account id the handler uses (state.account_id via
152                // `Arn::new`), not `principal.account_id`, so policy
153                // evaluation and the actual ARN can't diverge even if
154                // the two sources ever drift (identified by cubic on
155                // PR #399).
156                let state = self.state.read();
157                let partition = if request.region.starts_with("cn-") {
158                    "aws-cn"
159                } else if request.region.starts_with("us-gov-") {
160                    "aws-us-gov"
161                } else {
162                    "aws"
163                };
164                param(request, "Name")
165                    .map(|n| {
166                        format!(
167                            "arn:{}:sns:{}:{}:{}",
168                            partition, request.region, state.account_id, n
169                        )
170                    })
171                    .unwrap_or_else(|| "*".to_string())
172            }
173            "DeleteTopic"
174            | "GetTopicAttributes"
175            | "SetTopicAttributes"
176            | "Subscribe"
177            | "Publish"
178            | "PublishBatch"
179            | "ListSubscriptionsByTopic"
180            | "AddPermission"
181            | "RemovePermission"
182            // ConfirmSubscription is keyed by TopicArn (identified by
183            // cubic on PR #399) — the Token in the request confirms a
184            // subscription to that topic; SubscriptionArn only exists
185            // after confirmation.
186            | "ConfirmSubscription" => {
187                param(request, "TopicArn").unwrap_or_else(|| "*".to_string())
188            }
189            "Unsubscribe" | "GetSubscriptionAttributes" | "SetSubscriptionAttributes" => {
190                param(request, "SubscriptionArn").unwrap_or_else(|| "*".to_string())
191            }
192            "TagResource" | "UntagResource" | "ListTagsForResource" => {
193                param(request, "ResourceArn").unwrap_or_else(|| "*".to_string())
194            }
195            "DeletePlatformApplication"
196            | "GetPlatformApplicationAttributes"
197            | "SetPlatformApplicationAttributes"
198            | "CreatePlatformEndpoint"
199            | "ListEndpointsByPlatformApplication" => {
200                param(request, "PlatformApplicationArn").unwrap_or_else(|| "*".to_string())
201            }
202            "DeleteEndpoint" | "GetEndpointAttributes" | "SetEndpointAttributes" => {
203                param(request, "EndpointArn").unwrap_or_else(|| "*".to_string())
204            }
205            _ => "*".to_string(),
206        };
207        Some(fakecloud_core::auth::IamAction {
208            service: "sns",
209            action,
210            resource,
211        })
212    }
213}
214
215const SNS_SUPPORTED_ACTIONS: &[&str] = &[
216    "CreateTopic",
217    "DeleteTopic",
218    "ListTopics",
219    "GetTopicAttributes",
220    "SetTopicAttributes",
221    "Subscribe",
222    "ConfirmSubscription",
223    "Unsubscribe",
224    "Publish",
225    "PublishBatch",
226    "ListSubscriptions",
227    "ListSubscriptionsByTopic",
228    "GetSubscriptionAttributes",
229    "SetSubscriptionAttributes",
230    "TagResource",
231    "UntagResource",
232    "ListTagsForResource",
233    "AddPermission",
234    "RemovePermission",
235    "CreatePlatformApplication",
236    "DeletePlatformApplication",
237    "GetPlatformApplicationAttributes",
238    "SetPlatformApplicationAttributes",
239    "ListPlatformApplications",
240    "CreatePlatformEndpoint",
241    "DeleteEndpoint",
242    "GetEndpointAttributes",
243    "SetEndpointAttributes",
244    "ListEndpointsByPlatformApplication",
245    "SetSMSAttributes",
246    "GetSMSAttributes",
247    "CheckIfPhoneNumberIsOptedOut",
248    "ListPhoneNumbersOptedOut",
249    "OptInPhoneNumber",
250];
251
252/// SNS uses Query protocol — params come from query_params (which includes form body).
253fn param(req: &AwsRequest, name: &str) -> Option<String> {
254    // Try query params first (Query protocol)
255    if let Some(v) = req.query_params.get(name) {
256        return Some(v.clone());
257    }
258    // Try JSON body (JSON protocol)
259    if let Ok(body) = serde_json::from_slice::<Value>(&req.body) {
260        if let Some(s) = body[name].as_str() {
261            return Some(s.to_string());
262        }
263    }
264    None
265}
266
267fn required(req: &AwsRequest, name: &str) -> Result<String, AwsServiceError> {
268    param(req, name).ok_or_else(|| {
269        AwsServiceError::aws_error(
270            StatusCode::BAD_REQUEST,
271            "InvalidParameter",
272            format!("The request must contain the parameter {name}"),
273        )
274    })
275}
276
277fn validate_message_structure_json(message: &str) -> Result<(), AwsServiceError> {
278    let parsed: Value = serde_json::from_str(message).map_err(|_| {
279        AwsServiceError::aws_error(
280            StatusCode::BAD_REQUEST,
281            "InvalidParameter",
282            "Invalid parameter: Message Structure - No JSON message body is parseable",
283        )
284    })?;
285    if parsed.get("default").is_none() {
286        return Err(AwsServiceError::aws_error(
287            StatusCode::BAD_REQUEST,
288            "InvalidParameter",
289            "Invalid parameter: Message Structure - No default entry in JSON message body",
290        ));
291    }
292    Ok(())
293}
294
295fn not_found(entity: &str) -> AwsServiceError {
296    AwsServiceError::aws_error(
297        StatusCode::NOT_FOUND,
298        "NotFound",
299        format!("{entity} does not exist"),
300    )
301}
302
303/// Check if a topic ARN belongs to the given region
304fn arn_region(arn: &str) -> Option<&str> {
305    let parts: Vec<&str> = arn.split(':').collect();
306    if parts.len() >= 4 {
307        Some(parts[3])
308    } else {
309        None
310    }
311}
312
313/// SNS uses XML responses for Query protocol.
314fn xml_resp(inner: &str, _request_id: &str) -> AwsResponse {
315    let xml = format!("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n{inner}\n");
316    AwsResponse::xml(StatusCode::OK, xml)
317}
318
319const FIFO_NAME_ERROR: &str = "Fifo Topic names must end with .fifo and must be made up of only uppercase and lowercase ASCII letters, numbers, underscores, and hyphens, and must be between 1 and 256 characters long.";
320const STANDARD_NAME_ERROR: &str = "Topic names must be made up of only uppercase and lowercase ASCII letters, numbers, underscores, and hyphens, and must be between 1 and 256 characters long.";
321
322/// Validate a topic name according to AWS rules
323fn validate_topic_name(name: &str, is_fifo_attr: bool) -> Result<(), AwsServiceError> {
324    if name.is_empty() || name.len() > 256 {
325        return Err(AwsServiceError::aws_error(
326            StatusCode::BAD_REQUEST,
327            "InvalidParameter",
328            STANDARD_NAME_ERROR,
329        ));
330    }
331
332    let base_name = name.strip_suffix(".fifo").unwrap_or(name);
333    let valid_chars = base_name
334        .chars()
335        .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_');
336
337    if !valid_chars {
338        let msg = if name.ends_with(".fifo") || is_fifo_attr {
339            FIFO_NAME_ERROR
340        } else {
341            STANDARD_NAME_ERROR
342        };
343        return Err(AwsServiceError::aws_error(
344            StatusCode::BAD_REQUEST,
345            "InvalidParameter",
346            msg,
347        ));
348    }
349
350    // FIFO validation
351    if is_fifo_attr && !name.ends_with(".fifo") {
352        return Err(AwsServiceError::aws_error(
353            StatusCode::BAD_REQUEST,
354            "InvalidParameter",
355            FIFO_NAME_ERROR,
356        ));
357    }
358
359    if name.ends_with(".fifo") && !is_fifo_attr {
360        return Err(AwsServiceError::aws_error(
361            StatusCode::BAD_REQUEST,
362            "InvalidParameter",
363            STANDARD_NAME_ERROR,
364        ));
365    }
366
367    Ok(())
368}
369
370impl SnsService {
371    fn create_topic(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
372        let name = required(req, "Name")?;
373
374        // Parse attributes from Attributes.entry.N.key / Attributes.entry.N.value
375        let topic_attrs = parse_entries(req, "Attributes");
376        let is_fifo_attr = topic_attrs
377            .get("FifoTopic")
378            .map(|v| v.eq_ignore_ascii_case("true"))
379            .unwrap_or(false);
380        let is_fifo = name.ends_with(".fifo");
381
382        validate_topic_name(&name, is_fifo_attr)?;
383
384        // Parse tags from request
385        let tags = parse_tags(req);
386
387        let mut state = self.state.write();
388        let topic_arn = Arn::new("sns", &req.region, &state.account_id, &name).to_string();
389
390        if !state.topics.contains_key(&topic_arn) {
391            let mut attributes = HashMap::new();
392            // Set default policy
393            attributes.insert(
394                "Policy".to_string(),
395                default_policy(&topic_arn, &state.account_id),
396            );
397            attributes.insert("DisplayName".to_string(), String::new());
398            attributes.insert("DeliveryPolicy".to_string(), String::new());
399
400            if is_fifo {
401                attributes.insert("FifoTopic".to_string(), "true".to_string());
402                attributes.insert("ContentBasedDeduplication".to_string(), "false".to_string());
403            }
404
405            // Apply topic attributes from the request
406            for (k, v) in &topic_attrs {
407                // Normalize boolean-like values for FifoTopic and ContentBasedDeduplication
408                if k == "FifoTopic" || k == "ContentBasedDeduplication" {
409                    let normalized = if v.eq_ignore_ascii_case("true") {
410                        "true"
411                    } else {
412                        "false"
413                    };
414                    if k == "FifoTopic" && normalized == "false" {
415                        attributes.remove("FifoTopic");
416                        attributes.remove("ContentBasedDeduplication");
417                        continue;
418                    }
419                    attributes.insert(k.clone(), normalized.to_string());
420                    continue;
421                }
422                attributes.insert(k.clone(), v.clone());
423            }
424
425            let topic = SnsTopic {
426                topic_arn: topic_arn.clone(),
427                name,
428                attributes,
429                tags,
430                is_fifo,
431                created_at: Utc::now(),
432            };
433            state.topics.insert(topic_arn.clone(), topic);
434        }
435
436        Ok(xml_resp(
437            &format!(
438                r#"<CreateTopicResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
439  <CreateTopicResult>
440    <TopicArn>{topic_arn}</TopicArn>
441  </CreateTopicResult>
442  <ResponseMetadata>
443    <RequestId>{}</RequestId>
444  </ResponseMetadata>
445</CreateTopicResponse>"#,
446                req.request_id
447            ),
448            &req.request_id,
449        ))
450    }
451
452    fn delete_topic(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
453        let topic_arn = required(req, "TopicArn")?;
454        let mut state = self.state.write();
455        state.topics.remove(&topic_arn);
456        state
457            .subscriptions
458            .retain(|_, sub| sub.topic_arn != topic_arn);
459
460        Ok(xml_resp(
461            &format!(
462                r#"<DeleteTopicResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
463  <ResponseMetadata>
464    <RequestId>{}</RequestId>
465  </ResponseMetadata>
466</DeleteTopicResponse>"#,
467                req.request_id
468            ),
469            &req.request_id,
470        ))
471    }
472
473    fn list_topics(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
474        let state = self.state.read();
475
476        // Filter topics by region
477        let all_topics: Vec<&SnsTopic> = state
478            .topics
479            .values()
480            .filter(|t| {
481                // Extract region from ARN
482                let parts: Vec<&str> = t.topic_arn.split(':').collect();
483                parts.len() >= 4 && parts[3] == req.region
484            })
485            .collect();
486
487        let next_token = param(req, "NextToken")
488            .and_then(|t| t.parse::<usize>().ok())
489            .unwrap_or(0);
490        let next_token = next_token.min(all_topics.len());
491
492        let page = &all_topics[next_token..];
493        let has_more = page.len() > DEFAULT_PAGE_SIZE;
494        let page = if has_more {
495            &page[..DEFAULT_PAGE_SIZE]
496        } else {
497            page
498        };
499
500        let members: String = page
501            .iter()
502            .map(|t| {
503                format!(
504                    "      <member><TopicArn>{}</TopicArn></member>",
505                    t.topic_arn
506                )
507            })
508            .collect::<Vec<_>>()
509            .join("\n");
510
511        let next_token_xml = if has_more {
512            format!(
513                "\n    <NextToken>{}</NextToken>",
514                next_token + DEFAULT_PAGE_SIZE
515            )
516        } else {
517            String::new()
518        };
519
520        Ok(xml_resp(
521            &format!(
522                r#"<ListTopicsResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
523  <ListTopicsResult>
524    <Topics>
525{members}
526    </Topics>{next_token_xml}
527  </ListTopicsResult>
528  <ResponseMetadata>
529    <RequestId>{}</RequestId>
530  </ResponseMetadata>
531</ListTopicsResponse>"#,
532                req.request_id
533            ),
534            &req.request_id,
535        ))
536    }
537
538    fn get_topic_attributes(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
539        let topic_arn = required(req, "TopicArn")?;
540
541        // Check region: topic must belong to the request's region
542        if let Some(topic_region) = arn_region(&topic_arn) {
543            if topic_region != req.region {
544                return Err(not_found("Topic"));
545            }
546        }
547
548        let state = self.state.read();
549        let topic = state
550            .topics
551            .get(&topic_arn)
552            .ok_or_else(|| not_found("Topic"))?;
553
554        let subs_confirmed = state
555            .subscriptions
556            .values()
557            .filter(|s| s.topic_arn == topic_arn && s.confirmed)
558            .count();
559        let subs_pending = state
560            .subscriptions
561            .values()
562            .filter(|s| s.topic_arn == topic_arn && !s.confirmed)
563            .count();
564
565        let mut entries = vec![
566            format_attr("TopicArn", &topic.topic_arn),
567            format_attr("Owner", &state.account_id),
568            format_attr("SubscriptionsConfirmed", &subs_confirmed.to_string()),
569            format_attr("SubscriptionsPending", &subs_pending.to_string()),
570            format_attr("SubscriptionsDeleted", "0"),
571        ];
572
573        // Add EffectiveDeliveryPolicy
574        entries.push(format_attr(
575            "EffectiveDeliveryPolicy",
576            DEFAULT_EFFECTIVE_DELIVERY_POLICY,
577        ));
578
579        // Add all stored attributes
580        for (k, v) in &topic.attributes {
581            entries.push(format_attr(k, v));
582        }
583
584        let attrs = entries.join("\n");
585        Ok(xml_resp(
586            &format!(
587                r#"<GetTopicAttributesResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
588  <GetTopicAttributesResult>
589    <Attributes>
590{attrs}
591    </Attributes>
592  </GetTopicAttributesResult>
593  <ResponseMetadata>
594    <RequestId>{}</RequestId>
595  </ResponseMetadata>
596</GetTopicAttributesResponse>"#,
597                req.request_id
598            ),
599            &req.request_id,
600        ))
601    }
602
603    fn set_topic_attributes(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
604        let topic_arn = required(req, "TopicArn")?;
605        let attr_name = required(req, "AttributeName")?;
606        let attr_value = param(req, "AttributeValue").unwrap_or_default();
607
608        let mut state = self.state.write();
609        let topic = state
610            .topics
611            .get_mut(&topic_arn)
612            .ok_or_else(|| not_found("Topic"))?;
613
614        // If setting Policy, compact the JSON
615        if attr_name == "Policy" {
616            if let Ok(parsed) = serde_json::from_str::<Value>(&attr_value) {
617                if let Ok(compact) = serde_json::to_string(&parsed) {
618                    topic.attributes.insert(attr_name, compact);
619                } else {
620                    topic.attributes.insert(attr_name, attr_value);
621                }
622            } else {
623                topic.attributes.insert(attr_name, attr_value);
624            }
625        } else {
626            topic.attributes.insert(attr_name, attr_value);
627        }
628
629        Ok(xml_resp(
630            &format!(
631                r#"<SetTopicAttributesResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
632  <ResponseMetadata>
633    <RequestId>{}</RequestId>
634  </ResponseMetadata>
635</SetTopicAttributesResponse>"#,
636                req.request_id
637            ),
638            &req.request_id,
639        ))
640    }
641
642    fn subscribe(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
643        let topic_arn = required(req, "TopicArn")?;
644        let protocol = required(req, "Protocol")?;
645        let endpoint = param(req, "Endpoint").unwrap_or_default();
646
647        let state_r = self.state.read();
648        let topic = state_r
649            .topics
650            .get(&topic_arn)
651            .ok_or_else(|| not_found("Topic"))?;
652        let is_fifo_topic = topic.is_fifo;
653        let account_id = state_r.account_id.clone();
654
655        // Validate application endpoint exists
656        if protocol == "application" {
657            let endpoint_exists = state_r
658                .platform_applications
659                .values()
660                .any(|app| app.endpoints.contains_key(&endpoint));
661            if !endpoint_exists {
662                return Err(AwsServiceError::aws_error(
663                    StatusCode::BAD_REQUEST,
664                    "InvalidParameter",
665                    format!(
666                        "Invalid parameter: Endpoint Reason: Endpoint does not exist for endpoint {endpoint}"
667                    ),
668                ));
669            }
670        }
671        drop(state_r);
672
673        // Validate SMS endpoint
674        if protocol == "sms" {
675            validate_sms_endpoint(&endpoint)?;
676        }
677
678        // Validate SQS endpoint (must be an ARN)
679        if protocol == "sqs" && !endpoint.starts_with("arn:aws:sqs:") {
680            return Err(AwsServiceError::aws_error(
681                StatusCode::BAD_REQUEST,
682                "InvalidParameter",
683                "Invalid parameter: SQS endpoint ARN",
684            ));
685        }
686
687        // Validate: FIFO SQS queues can only be subscribed to FIFO topics
688        if protocol == "sqs" && endpoint.ends_with(".fifo") && !is_fifo_topic {
689            return Err(AwsServiceError::aws_error(
690                StatusCode::BAD_REQUEST,
691                "InvalidParameter",
692                "Invalid parameter: Invalid parameter: Endpoint Reason: FIFO SQS Queues can not be subscribed to standard SNS topics",
693            ));
694        }
695
696        // Parse subscription attributes
697        let sub_attrs = parse_entries(req, "Attributes");
698
699        // Validate subscription attribute names
700        for key in sub_attrs.keys() {
701            if !VALID_SUBSCRIPTION_ATTRS.contains(&key.as_str()) {
702                return Err(AwsServiceError::aws_error(
703                    StatusCode::BAD_REQUEST,
704                    "InvalidParameter",
705                    format!("Invalid parameter: Attributes Reason: Unknown attribute: {key}"),
706                ));
707            }
708        }
709
710        // Validate and auto-set FilterPolicy
711        let mut attributes = sub_attrs;
712        if let Some(fp) = attributes.get("FilterPolicy") {
713            if !fp.is_empty() {
714                validate_filter_policy(fp)?;
715            }
716            if !attributes.contains_key("FilterPolicyScope") {
717                attributes.insert(
718                    "FilterPolicyScope".to_string(),
719                    "MessageAttributes".to_string(),
720                );
721            }
722        }
723
724        // Check for duplicate subscription (same topic, protocol, endpoint)
725        let mut state = self.state.write();
726        for sub in state.subscriptions.values() {
727            if sub.topic_arn == topic_arn && sub.protocol == protocol && sub.endpoint == endpoint {
728                return Ok(xml_resp(
729                    &format!(
730                        r#"<SubscribeResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
731  <SubscribeResult>
732    <SubscriptionArn>{}</SubscriptionArn>
733  </SubscribeResult>
734  <ResponseMetadata>
735    <RequestId>{}</RequestId>
736  </ResponseMetadata>
737</SubscribeResponse>"#,
738                        sub.subscription_arn, req.request_id
739                    ),
740                    &req.request_id,
741                ));
742            }
743        }
744
745        let sub_arn = format!("{}:{}", topic_arn, uuid::Uuid::new_v4());
746
747        // HTTP/HTTPS subscriptions start as pending (require confirmation)
748        let confirmed = protocol != "http" && protocol != "https";
749        let response_arn = if confirmed {
750            sub_arn.clone()
751        } else {
752            "pending confirmation".to_string()
753        };
754
755        // Generate a confirmation token for pending subscriptions
756        let confirmation_token = if !confirmed {
757            Some(uuid::Uuid::new_v4().to_string())
758        } else {
759            None
760        };
761
762        let sub = SnsSubscription {
763            subscription_arn: sub_arn.clone(),
764            topic_arn,
765            protocol,
766            endpoint,
767            owner: account_id,
768            attributes,
769            confirmed,
770            confirmation_token,
771        };
772
773        state.subscriptions.insert(sub_arn.clone(), sub);
774
775        Ok(xml_resp(
776            &format!(
777                r#"<SubscribeResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
778  <SubscribeResult>
779    <SubscriptionArn>{response_arn}</SubscriptionArn>
780  </SubscribeResult>
781  <ResponseMetadata>
782    <RequestId>{}</RequestId>
783  </ResponseMetadata>
784</SubscribeResponse>"#,
785                req.request_id
786            ),
787            &req.request_id,
788        ))
789    }
790
791    fn confirm_subscription(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
792        let topic_arn = required(req, "TopicArn")?;
793        let token = required(req, "Token")?;
794
795        let mut state = self.state.write();
796        // AWS accepts both the confirmation token and the subscription ARN as the Token parameter.
797        // Confirming an already-confirmed subscription is a no-op (idempotent).
798        let sub_arn = state
799            .subscriptions
800            .values()
801            .find(|s| {
802                s.topic_arn == topic_arn
803                    && (s.confirmation_token.as_deref() == Some(&token)
804                        || s.subscription_arn == token)
805            })
806            .map(|s| s.subscription_arn.clone())
807            .ok_or_else(|| {
808                AwsServiceError::aws_error(
809                    StatusCode::NOT_FOUND,
810                    "NotFound",
811                    format!("No pending subscription found for token: {token}"),
812                )
813            })?;
814
815        // Mark the subscription as confirmed
816        if let Some(sub) = state.subscriptions.get_mut(&sub_arn) {
817            sub.confirmed = true;
818        }
819
820        Ok(xml_resp(
821            &format!(
822                r#"<ConfirmSubscriptionResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
823  <ConfirmSubscriptionResult>
824    <SubscriptionArn>{sub_arn}</SubscriptionArn>
825  </ConfirmSubscriptionResult>
826  <ResponseMetadata>
827    <RequestId>{}</RequestId>
828  </ResponseMetadata>
829</ConfirmSubscriptionResponse>"#,
830                req.request_id
831            ),
832            &req.request_id,
833        ))
834    }
835
836    fn unsubscribe(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
837        let sub_arn = required(req, "SubscriptionArn")?;
838        self.state.write().subscriptions.remove(&sub_arn);
839
840        Ok(xml_resp(
841            &format!(
842                r#"<UnsubscribeResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
843  <ResponseMetadata>
844    <RequestId>{}</RequestId>
845  </ResponseMetadata>
846</UnsubscribeResponse>"#,
847                req.request_id
848            ),
849            &req.request_id,
850        ))
851    }
852
853    fn publish(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
854        // Either TopicArn or TargetArn is required; also allow PhoneNumber for SMS
855        let topic_arn = param(req, "TopicArn").or_else(|| param(req, "TargetArn"));
856        let phone_number = param(req, "PhoneNumber");
857
858        if topic_arn.is_none() && phone_number.is_none() {
859            return Err(AwsServiceError::aws_error(
860                StatusCode::BAD_REQUEST,
861                "InvalidParameter",
862                "The request must contain the parameter TopicArn or TargetArn or PhoneNumber",
863            ));
864        }
865
866        let message = required(req, "Message")?;
867        let subject = param(req, "Subject");
868        let message_group_id = param(req, "MessageGroupId");
869        let message_dedup_id = param(req, "MessageDeduplicationId");
870        let message_structure = param(req, "MessageStructure");
871
872        // Validate subject length
873        if let Some(ref subj) = subject {
874            if subj.len() > 100 {
875                return Err(AwsServiceError::aws_error(
876                    StatusCode::BAD_REQUEST,
877                    "InvalidParameter",
878                    "Subject must be less than 100 characters",
879                ));
880            }
881        }
882
883        // Validate message length (256KB)
884        if message.len() > 262144 {
885            return Err(AwsServiceError::aws_error(
886                StatusCode::BAD_REQUEST,
887                "InvalidParameter",
888                "Invalid parameter: Message too long",
889            ));
890        }
891
892        // Validate MessageStructure=json
893        if message_structure.as_deref() == Some("json") {
894            validate_message_structure_json(&message)?;
895        }
896
897        // Parse MessageAttributes from query params
898        let message_attributes = parse_message_attributes(req);
899
900        if let Some(ref phone) = phone_number {
901            return self.publish_to_phone_number(
902                req,
903                phone,
904                message,
905                subject,
906                message_attributes,
907                message_group_id,
908                message_dedup_id,
909            );
910        }
911
912        let topic_arn = topic_arn.ok_or_else(|| {
913            AwsServiceError::aws_error(
914                StatusCode::BAD_REQUEST,
915                "InvalidParameter",
916                "TopicArn or TargetArn is required",
917            )
918        })?;
919
920        // Check if it's a platform endpoint ARN
921        if topic_arn.contains(":endpoint/") {
922            return self.publish_to_platform_endpoint(
923                &topic_arn,
924                &message,
925                &message_attributes,
926                &req.request_id,
927            );
928        }
929
930        let mut state = self.state.write();
931        let topic = state
932            .topics
933            .get(&topic_arn)
934            .ok_or_else(|| not_found("Topic"))?;
935
936        // FIFO topic enforcement
937        if topic.is_fifo {
938            if message_group_id.is_none() {
939                return Err(AwsServiceError::aws_error(
940                    StatusCode::BAD_REQUEST,
941                    "InvalidParameter",
942                    "Invalid parameter: The request must contain the parameter MessageGroupId.",
943                ));
944            }
945            // FIFO topics require deduplication: either ContentBasedDeduplication or explicit ID
946            let content_dedup = topic
947                .attributes
948                .get("ContentBasedDeduplication")
949                .map(|v| v == "true")
950                .unwrap_or(false);
951            if !content_dedup && message_dedup_id.is_none() {
952                return Err(AwsServiceError::aws_error(
953                    StatusCode::BAD_REQUEST,
954                    "InvalidParameter",
955                    "Invalid parameter: The topic should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly",
956                ));
957            }
958        } else {
959            // Non-FIFO: MessageGroupId is allowed (forwarded to SQS for fair queuing)
960            // But DeduplicationId is NOT allowed on non-FIFO topics
961            if message_dedup_id.is_some() {
962                return Err(AwsServiceError::aws_error(
963                    StatusCode::BAD_REQUEST,
964                    "InvalidParameter",
965                    "Invalid parameter: The request includes MessageDeduplicationId parameter that is not valid for this topic type",
966                ));
967            }
968        }
969
970        let msg_id = uuid::Uuid::new_v4().to_string();
971        state.published.push(PublishedMessage {
972            message_id: msg_id.clone(),
973            topic_arn: topic_arn.clone(),
974            message: message.clone(),
975            subject: subject.clone(),
976            message_attributes: message_attributes.clone(),
977            message_group_id: message_group_id.clone(),
978            message_dedup_id: message_dedup_id.clone(),
979            timestamp: Utc::now(),
980        });
981
982        // Resolve the actual message per protocol for MessageStructure=json
983        let parsed_structure: Option<Value> = if message_structure.as_deref() == Some("json") {
984            Some(serde_json::from_str(&message).map_err(|_| {
985                AwsServiceError::aws_error(
986                    StatusCode::BAD_REQUEST,
987                    "InvalidParameter",
988                    "Invalid parameter: Message Structure - No JSON message body is parseable",
989                )
990            })?)
991        } else {
992            None
993        };
994
995        let subscribers =
996            collect_topic_subscribers(&state, &topic_arn, &message_attributes, &message);
997        let endpoint = state.endpoint.clone();
998        drop(state);
999
1000        // Determine actual message content per protocol
1001        let sqs_message = if let Some(ref structure) = parsed_structure {
1002            structure
1003                .get("sqs")
1004                .or_else(|| structure.get("default"))
1005                .and_then(|v| v.as_str())
1006                .unwrap_or(&message)
1007                .to_string()
1008        } else {
1009            message.clone()
1010        };
1011
1012        let default_message = if let Some(ref structure) = parsed_structure {
1013            structure
1014                .get("default")
1015                .and_then(|v| v.as_str())
1016                .unwrap_or(&message)
1017                .to_string()
1018        } else {
1019            message.clone()
1020        };
1021
1022        let envelope_attrs = build_envelope_attrs(&message_attributes);
1023
1024        let ctx = TopicFanoutContext {
1025            msg_id: &msg_id,
1026            topic_arn: &topic_arn,
1027            subject: subject.as_deref(),
1028            endpoint: &endpoint,
1029            sqs_message: &sqs_message,
1030            default_message: &default_message,
1031            envelope_attrs: &envelope_attrs,
1032            message_attributes: &message_attributes,
1033            message_group_id: message_group_id.as_deref(),
1034            message_dedup_id: message_dedup_id.as_deref(),
1035        };
1036
1037        self.deliver_to_sqs_subscribers(&subscribers.sqs, &ctx);
1038        self.deliver_to_http_subscribers(&subscribers.http, &ctx);
1039        self.deliver_to_lambda_subscribers(&subscribers.lambda, &ctx);
1040        self.deliver_to_email_subscribers(&subscribers.email, &ctx);
1041        self.deliver_to_sms_subscribers(&subscribers.sms, &ctx);
1042
1043        Ok(xml_resp(
1044            &format!(
1045                r#"<PublishResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
1046  <PublishResult>
1047    <MessageId>{msg_id}</MessageId>
1048  </PublishResult>
1049  <ResponseMetadata>
1050    <RequestId>{}</RequestId>
1051  </ResponseMetadata>
1052</PublishResponse>"#,
1053                req.request_id
1054            ),
1055            &req.request_id,
1056        ))
1057    }
1058
1059    fn deliver_to_sqs_subscribers(&self, subs: &[(String, bool)], ctx: &TopicFanoutContext<'_>) {
1060        for (queue_arn, raw) in subs {
1061            if *raw {
1062                let mut sqs_msg_attrs = HashMap::new();
1063                for (k, v) in ctx.message_attributes {
1064                    let mut attr = fakecloud_core::delivery::SqsMessageAttribute {
1065                        data_type: v.data_type.clone(),
1066                        string_value: v.string_value.clone(),
1067                        binary_value: None,
1068                    };
1069                    if let Some(ref bv) = v.binary_value {
1070                        attr.binary_value =
1071                            Some(base64::engine::general_purpose::STANDARD.encode(bv));
1072                    }
1073                    sqs_msg_attrs.insert(k.clone(), attr);
1074                }
1075                self.delivery.send_to_sqs_with_attrs(
1076                    queue_arn,
1077                    ctx.sqs_message,
1078                    &sqs_msg_attrs,
1079                    ctx.message_group_id,
1080                    ctx.message_dedup_id,
1081                );
1082            } else {
1083                let envelope_str = build_sns_envelope(
1084                    ctx.msg_id,
1085                    ctx.topic_arn,
1086                    &ctx.subject.map(|s| s.to_string()),
1087                    ctx.sqs_message,
1088                    ctx.envelope_attrs,
1089                    ctx.endpoint,
1090                );
1091                self.delivery
1092                    .send_to_sqs(queue_arn, &envelope_str, &HashMap::new());
1093            }
1094        }
1095    }
1096
1097    fn deliver_to_http_subscribers(&self, subs: &[String], ctx: &TopicFanoutContext<'_>) {
1098        for endpoint_url in subs {
1099            let body = build_sns_envelope(
1100                ctx.msg_id,
1101                ctx.topic_arn,
1102                &ctx.subject.map(|s| s.to_string()),
1103                ctx.default_message,
1104                ctx.envelope_attrs,
1105                ctx.endpoint,
1106            );
1107            let endpoint_url = endpoint_url.clone();
1108            let topic = ctx.topic_arn.to_string();
1109            tokio::spawn(async move {
1110                let client = reqwest::Client::new();
1111                let result = client
1112                    .post(&endpoint_url)
1113                    .header("Content-Type", "application/json")
1114                    .header("x-amz-sns-message-type", "Notification")
1115                    .header("x-amz-sns-topic-arn", &topic)
1116                    .body(body)
1117                    .send()
1118                    .await;
1119                if let Err(e) = result {
1120                    tracing::warn!(endpoint = %endpoint_url, error = %e, "SNS HTTP delivery failed");
1121                }
1122            });
1123        }
1124    }
1125
1126    fn deliver_to_lambda_subscribers(
1127        &self,
1128        subs: &[(String, String)],
1129        ctx: &TopicFanoutContext<'_>,
1130    ) {
1131        if subs.is_empty() {
1132            return;
1133        }
1134        let now = Utc::now();
1135        let subject_owned = ctx.subject.map(|s| s.to_string());
1136
1137        let lambda_payloads: Vec<(String, String)> = subs
1138            .iter()
1139            .map(|(function_arn, subscription_arn)| {
1140                let payload = build_sns_lambda_event(&SnsLambdaEventInput {
1141                    message_id: ctx.msg_id,
1142                    topic_arn: ctx.topic_arn,
1143                    subscription_arn,
1144                    message: ctx.default_message,
1145                    subject: ctx.subject,
1146                    message_attributes: ctx.envelope_attrs,
1147                    timestamp: &now,
1148                    endpoint: ctx.endpoint,
1149                });
1150                (function_arn.clone(), payload)
1151            })
1152            .collect();
1153
1154        {
1155            let mut state = self.state.write();
1156            for (function_arn, _) in &lambda_payloads {
1157                state
1158                    .lambda_invocations
1159                    .push(crate::state::LambdaInvocation {
1160                        function_arn: function_arn.clone(),
1161                        message: ctx.default_message.to_string(),
1162                        subject: subject_owned.clone(),
1163                        timestamp: now,
1164                    });
1165            }
1166        }
1167
1168        let delivery = self.delivery.clone();
1169        tokio::spawn(async move {
1170            for (function_arn, payload) in lambda_payloads {
1171                tracing::info!(function_arn = %function_arn, "SNS invoking Lambda function");
1172                match delivery.invoke_lambda(&function_arn, &payload).await {
1173                    Some(Ok(_)) => {
1174                        tracing::info!(
1175                            function_arn = %function_arn,
1176                            "SNS->Lambda invocation succeeded"
1177                        );
1178                    }
1179                    Some(Err(e)) => {
1180                        tracing::error!(
1181                            function_arn = %function_arn,
1182                            error = %e,
1183                            "SNS->Lambda invocation failed"
1184                        );
1185                    }
1186                    None => {
1187                        tracing::debug!(
1188                            function_arn = %function_arn,
1189                            "SNS->Lambda: no container runtime, skipping real execution"
1190                        );
1191                    }
1192                }
1193            }
1194        });
1195    }
1196
1197    fn deliver_to_email_subscribers(&self, subs: &[String], ctx: &TopicFanoutContext<'_>) {
1198        if subs.is_empty() {
1199            return;
1200        }
1201        let now = Utc::now();
1202        let subject_owned = ctx.subject.map(|s| s.to_string());
1203        let mut state = self.state.write();
1204        for email_address in subs {
1205            tracing::info!(
1206                email = %email_address,
1207                topic_arn = %ctx.topic_arn,
1208                "SNS delivering to email (stub)"
1209            );
1210            state.sent_emails.push(crate::state::SentEmail {
1211                email_address: email_address.clone(),
1212                message: ctx.default_message.to_string(),
1213                subject: subject_owned.clone(),
1214                topic_arn: ctx.topic_arn.to_string(),
1215                timestamp: now,
1216            });
1217        }
1218    }
1219
1220    fn deliver_to_sms_subscribers(&self, subs: &[String], ctx: &TopicFanoutContext<'_>) {
1221        if subs.is_empty() {
1222            return;
1223        }
1224        let mut state = self.state.write();
1225        for phone_number in subs {
1226            tracing::info!(
1227                phone_number = %phone_number,
1228                topic_arn = %ctx.topic_arn,
1229                "SNS delivering to SMS (stub)"
1230            );
1231            state
1232                .sms_messages
1233                .push((phone_number.clone(), ctx.default_message.to_string()));
1234        }
1235    }
1236
1237    fn publish_batch(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1238        let topic_arn = required(req, "TopicArn")?;
1239
1240        let state = self.state.read();
1241        let topic = state
1242            .topics
1243            .get(&topic_arn)
1244            .ok_or_else(|| not_found("Topic"))?;
1245        let is_fifo = topic.is_fifo;
1246        let endpoint = state.endpoint.clone();
1247        drop(state);
1248
1249        // Parse batch entries: PublishBatchRequestEntries.member.N.*
1250        let mut entries = Vec::new();
1251        for n in 1..=100 {
1252            let id_key = format!("PublishBatchRequestEntries.member.{n}.Id");
1253            if let Some(id) = req.query_params.get(&id_key) {
1254                let msg_key = format!("PublishBatchRequestEntries.member.{n}.Message");
1255                let message = req.query_params.get(&msg_key).cloned().unwrap_or_default();
1256                let subject_key = format!("PublishBatchRequestEntries.member.{n}.Subject");
1257                let subject = req.query_params.get(&subject_key).cloned();
1258                let group_key = format!("PublishBatchRequestEntries.member.{n}.MessageGroupId");
1259                let group_id = req.query_params.get(&group_key).cloned();
1260                let dedup_key =
1261                    format!("PublishBatchRequestEntries.member.{n}.MessageDeduplicationId");
1262                let dedup_id = req.query_params.get(&dedup_key).cloned();
1263                let structure_key =
1264                    format!("PublishBatchRequestEntries.member.{n}.MessageStructure");
1265                let message_structure = req.query_params.get(&structure_key).cloned();
1266                entries.push((
1267                    id.clone(),
1268                    message,
1269                    subject,
1270                    group_id,
1271                    dedup_id,
1272                    message_structure,
1273                ));
1274            } else {
1275                break;
1276            }
1277        }
1278
1279        // Validate: max 10 entries
1280        if entries.len() > 10 {
1281            return Err(AwsServiceError::aws_error(
1282                StatusCode::BAD_REQUEST,
1283                "TooManyEntriesInBatchRequest",
1284                "The batch request contains more entries than permissible.",
1285            ));
1286        }
1287
1288        // Validate: unique IDs
1289        let ids: Vec<&str> = entries.iter().map(|e| e.0.as_str()).collect();
1290        let unique_ids: std::collections::HashSet<&str> = ids.iter().copied().collect();
1291        if unique_ids.len() != ids.len() {
1292            return Err(AwsServiceError::aws_error(
1293                StatusCode::BAD_REQUEST,
1294                "BatchEntryIdsNotDistinct",
1295                "Two or more batch entries in the request have the same Id.",
1296            ));
1297        }
1298
1299        // FIFO: all entries must have MessageGroupId — this is a top-level error
1300        if is_fifo && entries.iter().any(|e| e.3.is_none()) {
1301            return Err(AwsServiceError::aws_error(
1302                StatusCode::BAD_REQUEST,
1303                "InvalidParameter",
1304                "Invalid parameter: The MessageGroupId parameter is required for FIFO topics",
1305            ));
1306        }
1307
1308        let mut successful = Vec::new();
1309        let failed: Vec<String> = Vec::new();
1310
1311        for (idx, (id, message, subject, group_id, dedup_id, structure)) in
1312            entries.iter().enumerate()
1313        {
1314            // Parse per-entry message attributes
1315            let batch_attrs = parse_batch_message_attributes(req, idx + 1);
1316
1317            // Validate MessageStructure=json
1318            if structure.as_deref() == Some("json") {
1319                validate_message_structure_json(message)?;
1320            }
1321
1322            let msg_id = uuid::Uuid::new_v4().to_string();
1323            let mut state = self.state.write();
1324            state.published.push(PublishedMessage {
1325                message_id: msg_id.clone(),
1326                topic_arn: topic_arn.clone(),
1327                message: message.clone(),
1328                subject: subject.clone(),
1329                message_attributes: batch_attrs.clone(),
1330                message_group_id: group_id.clone(),
1331                message_dedup_id: dedup_id.clone(),
1332                timestamp: Utc::now(),
1333            });
1334
1335            // Resolve message for SQS via MessageStructure=json
1336            let parsed_structure: Option<Value> = if structure.as_deref() == Some("json") {
1337                Some(serde_json::from_str(message).map_err(|_| {
1338                    AwsServiceError::aws_error(
1339                        StatusCode::BAD_REQUEST,
1340                        "InvalidParameter",
1341                        "Invalid parameter: Message Structure - No JSON message body is parseable",
1342                    )
1343                })?)
1344            } else {
1345                None
1346            };
1347            let sqs_message = if let Some(ref s) = parsed_structure {
1348                s.get("sqs")
1349                    .or_else(|| s.get("default"))
1350                    .and_then(|v| v.as_str())
1351                    .unwrap_or(message)
1352                    .to_string()
1353            } else {
1354                message.clone()
1355            };
1356
1357            // Deliver to SQS subscribers
1358            let sqs_subscribers: Vec<(String, bool)> = state
1359                .subscriptions
1360                .values()
1361                .filter(|s| s.topic_arn == topic_arn && s.protocol == "sqs" && s.confirmed)
1362                .map(|s| {
1363                    let raw = s
1364                        .attributes
1365                        .get("RawMessageDelivery")
1366                        .map(|v| v == "true")
1367                        .unwrap_or(false);
1368                    (s.endpoint.clone(), raw)
1369                })
1370                .collect();
1371            drop(state);
1372
1373            // Build envelope attributes
1374            let mut envelope_attrs = serde_json::Map::new();
1375            for (key, attr) in &batch_attrs {
1376                let mut attr_obj = serde_json::Map::new();
1377                attr_obj.insert("Type".to_string(), Value::String(attr.data_type.clone()));
1378                if let Some(ref sv) = attr.string_value {
1379                    attr_obj.insert("Value".to_string(), Value::String(sv.clone()));
1380                }
1381                if let Some(ref bv) = attr.binary_value {
1382                    attr_obj.insert(
1383                        "Value".to_string(),
1384                        Value::String(base64::engine::general_purpose::STANDARD.encode(bv)),
1385                    );
1386                }
1387                envelope_attrs.insert(key.clone(), Value::Object(attr_obj));
1388            }
1389
1390            for (queue_arn, raw) in &sqs_subscribers {
1391                if *raw {
1392                    let mut sqs_msg_attrs = HashMap::new();
1393                    for (k, v) in &batch_attrs {
1394                        let mut attr = fakecloud_core::delivery::SqsMessageAttribute {
1395                            data_type: v.data_type.clone(),
1396                            string_value: v.string_value.clone(),
1397                            binary_value: None,
1398                        };
1399                        if let Some(ref bv) = v.binary_value {
1400                            attr.binary_value =
1401                                Some(base64::engine::general_purpose::STANDARD.encode(bv));
1402                        }
1403                        sqs_msg_attrs.insert(k.clone(), attr);
1404                    }
1405                    self.delivery.send_to_sqs_with_attrs(
1406                        queue_arn,
1407                        &sqs_message,
1408                        &sqs_msg_attrs,
1409                        if is_fifo { group_id.as_deref() } else { None },
1410                        if is_fifo { dedup_id.as_deref() } else { None },
1411                    );
1412                } else {
1413                    let envelope_str = build_sns_envelope(
1414                        &msg_id,
1415                        &topic_arn,
1416                        subject,
1417                        &sqs_message,
1418                        &envelope_attrs,
1419                        &endpoint,
1420                    );
1421                    self.delivery
1422                        .send_to_sqs(queue_arn, &envelope_str, &HashMap::new());
1423                }
1424            }
1425
1426            successful.push(format!(
1427                r#"    <member>
1428      <Id>{id}</Id>
1429      <MessageId>{msg_id}</MessageId>
1430    </member>"#
1431            ));
1432        }
1433
1434        let successful_xml = successful.join("\n");
1435        let failed_xml = failed.join("\n");
1436
1437        Ok(xml_resp(
1438            &format!(
1439                r#"<PublishBatchResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
1440  <PublishBatchResult>
1441    <Successful>
1442{successful_xml}
1443    </Successful>
1444    <Failed>
1445{failed_xml}
1446    </Failed>
1447  </PublishBatchResult>
1448  <ResponseMetadata>
1449    <RequestId>{}</RequestId>
1450  </ResponseMetadata>
1451</PublishBatchResponse>"#,
1452                req.request_id
1453            ),
1454            &req.request_id,
1455        ))
1456    }
1457
1458    /// Publish directly to an SMS destination (`Publish` called with
1459    /// `PhoneNumber` instead of `TopicArn` / `TargetArn`). This path
1460    /// does its own length and E.164 validation since AWS reports
1461    /// distinct error messages for SMS.
1462    #[allow(clippy::too_many_arguments)]
1463    fn publish_to_phone_number(
1464        &self,
1465        req: &AwsRequest,
1466        phone: &str,
1467        message: String,
1468        subject: Option<String>,
1469        message_attributes: HashMap<String, MessageAttribute>,
1470        message_group_id: Option<String>,
1471        message_dedup_id: Option<String>,
1472    ) -> Result<AwsResponse, AwsServiceError> {
1473        let is_valid_e164 = phone.starts_with('+')
1474            && phone.len() >= 2
1475            && phone[1..].chars().all(|c| c.is_ascii_digit());
1476        if !is_valid_e164 {
1477            return Err(AwsServiceError::aws_error(
1478                StatusCode::BAD_REQUEST,
1479                "InvalidParameter",
1480                format!(
1481                    "Invalid parameter: PhoneNumber Reason: {phone} does not meet the E164 format"
1482                ),
1483            ));
1484        }
1485
1486        if message.len() > 1600 {
1487            return Err(AwsServiceError::aws_error(
1488                StatusCode::BAD_REQUEST,
1489                "InvalidParameter",
1490                "Invalid parameter: Message Reason: Message must be less than 1600 characters long",
1491            ));
1492        }
1493
1494        let msg_id = uuid::Uuid::new_v4().to_string();
1495        let mut state = self.state.write();
1496        state
1497            .sms_messages
1498            .push((phone.to_string(), message.clone()));
1499        state.published.push(PublishedMessage {
1500            message_id: msg_id.clone(),
1501            topic_arn: String::new(),
1502            message,
1503            subject,
1504            message_attributes,
1505            message_group_id,
1506            message_dedup_id,
1507            timestamp: Utc::now(),
1508        });
1509
1510        Ok(xml_resp(
1511            &format!(
1512                r#"<PublishResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
1513  <PublishResult>
1514    <MessageId>{msg_id}</MessageId>
1515  </PublishResult>
1516  <ResponseMetadata>
1517    <RequestId>{}</RequestId>
1518  </ResponseMetadata>
1519</PublishResponse>"#,
1520                req.request_id
1521            ),
1522            &req.request_id,
1523        ))
1524    }
1525
1526    fn publish_to_platform_endpoint(
1527        &self,
1528        endpoint_arn: &str,
1529        message: &str,
1530        message_attributes: &HashMap<String, MessageAttribute>,
1531        request_id: &str,
1532    ) -> Result<AwsResponse, AwsServiceError> {
1533        let state = self.state.read();
1534
1535        // Find the platform endpoint
1536        let mut found_endpoint: Option<&PlatformEndpoint> = None;
1537        for app in state.platform_applications.values() {
1538            if let Some(ep) = app.endpoints.get(endpoint_arn) {
1539                found_endpoint = Some(ep);
1540                break;
1541            }
1542        }
1543
1544        let ep = found_endpoint.ok_or_else(|| {
1545            AwsServiceError::aws_error(StatusCode::NOT_FOUND, "NotFound", "Endpoint does not exist")
1546        })?;
1547
1548        if !ep.enabled {
1549            return Err(AwsServiceError::aws_error(
1550                StatusCode::BAD_REQUEST,
1551                "EndpointDisabled",
1552                "Endpoint is disabled",
1553            ));
1554        }
1555        drop(state);
1556
1557        let msg_id = uuid::Uuid::new_v4().to_string();
1558        let mut state = self.state.write();
1559        // Store message on the endpoint
1560        for app in state.platform_applications.values_mut() {
1561            if let Some(ep) = app.endpoints.get_mut(endpoint_arn) {
1562                ep.messages.push(PublishedMessage {
1563                    message_id: msg_id.clone(),
1564                    topic_arn: endpoint_arn.to_string(),
1565                    message: message.to_string(),
1566                    subject: None,
1567                    message_attributes: message_attributes.clone(),
1568                    message_group_id: None,
1569                    message_dedup_id: None,
1570                    timestamp: Utc::now(),
1571                });
1572                break;
1573            }
1574        }
1575
1576        Ok(xml_resp(
1577            &format!(
1578                r#"<PublishResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
1579  <PublishResult>
1580    <MessageId>{msg_id}</MessageId>
1581  </PublishResult>
1582  <ResponseMetadata>
1583    <RequestId>{request_id}</RequestId>
1584  </ResponseMetadata>
1585</PublishResponse>"#,
1586            ),
1587            request_id,
1588        ))
1589    }
1590
1591    fn list_subscriptions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1592        let state = self.state.read();
1593
1594        let all_subs: Vec<&SnsSubscription> = state.subscriptions.values().collect();
1595        let next_token = param(req, "NextToken")
1596            .and_then(|t| t.parse::<usize>().ok())
1597            .unwrap_or(0);
1598        let next_token = next_token.min(all_subs.len());
1599
1600        let page = &all_subs[next_token..];
1601        let has_more = page.len() > DEFAULT_PAGE_SIZE;
1602        let page = if has_more {
1603            &page[..DEFAULT_PAGE_SIZE]
1604        } else {
1605            page
1606        };
1607
1608        let members: String = page
1609            .iter()
1610            .map(|s| format_sub_member(s))
1611            .collect::<Vec<_>>()
1612            .join("\n");
1613
1614        let next_token_xml = if has_more {
1615            format!(
1616                "\n    <NextToken>{}</NextToken>",
1617                next_token + DEFAULT_PAGE_SIZE
1618            )
1619        } else {
1620            String::new()
1621        };
1622
1623        Ok(xml_resp(
1624            &format!(
1625                r#"<ListSubscriptionsResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
1626  <ListSubscriptionsResult>
1627    <Subscriptions>
1628{members}
1629    </Subscriptions>{next_token_xml}
1630  </ListSubscriptionsResult>
1631  <ResponseMetadata>
1632    <RequestId>{}</RequestId>
1633  </ResponseMetadata>
1634</ListSubscriptionsResponse>"#,
1635                req.request_id
1636            ),
1637            &req.request_id,
1638        ))
1639    }
1640
1641    fn list_subscriptions_by_topic(
1642        &self,
1643        req: &AwsRequest,
1644    ) -> Result<AwsResponse, AwsServiceError> {
1645        let topic_arn = required(req, "TopicArn")?;
1646        let state = self.state.read();
1647
1648        let all_subs: Vec<&SnsSubscription> = state
1649            .subscriptions
1650            .values()
1651            .filter(|s| s.topic_arn == topic_arn)
1652            .collect();
1653
1654        let next_token = param(req, "NextToken")
1655            .and_then(|t| t.parse::<usize>().ok())
1656            .unwrap_or(0);
1657        let next_token = next_token.min(all_subs.len());
1658
1659        let page = &all_subs[next_token..];
1660        let has_more = page.len() > DEFAULT_PAGE_SIZE;
1661        let page = if has_more {
1662            &page[..DEFAULT_PAGE_SIZE]
1663        } else {
1664            page
1665        };
1666
1667        let members: String = page
1668            .iter()
1669            .map(|s| format_sub_member(s))
1670            .collect::<Vec<_>>()
1671            .join("\n");
1672
1673        let next_token_xml = if has_more {
1674            format!(
1675                "\n    <NextToken>{}</NextToken>",
1676                next_token + DEFAULT_PAGE_SIZE
1677            )
1678        } else {
1679            String::new()
1680        };
1681
1682        Ok(xml_resp(
1683            &format!(
1684                r#"<ListSubscriptionsByTopicResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
1685  <ListSubscriptionsByTopicResult>
1686    <Subscriptions>
1687{members}
1688    </Subscriptions>{next_token_xml}
1689  </ListSubscriptionsByTopicResult>
1690  <ResponseMetadata>
1691    <RequestId>{}</RequestId>
1692  </ResponseMetadata>
1693</ListSubscriptionsByTopicResponse>"#,
1694                req.request_id
1695            ),
1696            &req.request_id,
1697        ))
1698    }
1699
1700    fn get_subscription_attributes(
1701        &self,
1702        req: &AwsRequest,
1703    ) -> Result<AwsResponse, AwsServiceError> {
1704        let sub_arn = required(req, "SubscriptionArn")?;
1705        let state = self.state.read();
1706        let sub = state
1707            .subscriptions
1708            .get(&sub_arn)
1709            .ok_or_else(|| not_found("Subscription"))?;
1710
1711        let mut entries = vec![
1712            format_attr("SubscriptionArn", &sub.subscription_arn),
1713            format_attr("TopicArn", &sub.topic_arn),
1714            format_attr("Protocol", &sub.protocol),
1715            format_attr("Endpoint", &sub.endpoint),
1716            format_attr("Owner", &sub.owner),
1717            format_attr("ConfirmationWasAuthenticated", "true"),
1718            format_attr("PendingConfirmation", "false"),
1719        ];
1720
1721        // Add RawMessageDelivery from attributes or default
1722        if !sub.attributes.contains_key("RawMessageDelivery") {
1723            entries.push(format_attr("RawMessageDelivery", "false"));
1724        }
1725
1726        // Add EffectiveDeliveryPolicy
1727        entries.push(format_attr(
1728            "EffectiveDeliveryPolicy",
1729            DEFAULT_EFFECTIVE_DELIVERY_POLICY,
1730        ));
1731
1732        for (k, v) in &sub.attributes {
1733            // Skip empty FilterPolicy (unsetting it removes it)
1734            if k == "FilterPolicy" && v.is_empty() {
1735                continue;
1736            }
1737            // If FilterPolicy is unset, also skip FilterPolicyScope
1738            if k == "FilterPolicyScope" {
1739                let has_filter = sub
1740                    .attributes
1741                    .get("FilterPolicy")
1742                    .map(|v| !v.is_empty())
1743                    .unwrap_or(false);
1744                if !has_filter {
1745                    continue;
1746                }
1747            }
1748            entries.push(format_attr(k, v));
1749        }
1750        let attrs = entries.join("\n");
1751
1752        Ok(xml_resp(
1753            &format!(
1754                r#"<GetSubscriptionAttributesResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
1755  <GetSubscriptionAttributesResult>
1756    <Attributes>
1757{attrs}
1758    </Attributes>
1759  </GetSubscriptionAttributesResult>
1760  <ResponseMetadata>
1761    <RequestId>{}</RequestId>
1762  </ResponseMetadata>
1763</GetSubscriptionAttributesResponse>"#,
1764                req.request_id
1765            ),
1766            &req.request_id,
1767        ))
1768    }
1769
1770    fn set_subscription_attributes(
1771        &self,
1772        req: &AwsRequest,
1773    ) -> Result<AwsResponse, AwsServiceError> {
1774        let sub_arn = required(req, "SubscriptionArn")?;
1775        let attr_name = required(req, "AttributeName")?;
1776        let attr_value = param(req, "AttributeValue").unwrap_or_default();
1777
1778        // Validate attribute name
1779        if !VALID_SUBSCRIPTION_ATTRS.contains(&attr_name.as_str()) {
1780            return Err(AwsServiceError::aws_error(
1781                StatusCode::BAD_REQUEST,
1782                "InvalidParameter",
1783                "Invalid parameter: AttributeName".to_string(),
1784            ));
1785        }
1786
1787        // Validate filter policy
1788        if attr_name == "FilterPolicy" && !attr_value.is_empty() {
1789            validate_filter_policy(&attr_value)?;
1790        }
1791
1792        let mut state = self.state.write();
1793        let sub = state
1794            .subscriptions
1795            .get_mut(&sub_arn)
1796            .ok_or_else(|| not_found("Subscription"))?;
1797
1798        sub.attributes.insert(attr_name.clone(), attr_value.clone());
1799
1800        // Setting FilterPolicy auto-sets FilterPolicyScope
1801        if attr_name == "FilterPolicy" && !attr_value.is_empty() {
1802            sub.attributes
1803                .entry("FilterPolicyScope".to_string())
1804                .or_insert_with(|| "MessageAttributes".to_string());
1805        }
1806
1807        Ok(xml_resp(
1808            &format!(
1809                r#"<SetSubscriptionAttributesResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
1810  <ResponseMetadata>
1811    <RequestId>{}</RequestId>
1812  </ResponseMetadata>
1813</SetSubscriptionAttributesResponse>"#,
1814                req.request_id
1815            ),
1816            &req.request_id,
1817        ))
1818    }
1819
1820    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1821        let resource_arn = required(req, "ResourceArn")?;
1822        let new_tags = parse_tags(req);
1823
1824        let mut state = self.state.write();
1825        let topic = state.topics.get_mut(&resource_arn).ok_or_else(|| {
1826            AwsServiceError::aws_error(
1827                StatusCode::NOT_FOUND,
1828                "ResourceNotFound",
1829                "Resource does not exist",
1830            )
1831        })?;
1832
1833        // Check tag quota: existing + new (after dedup) must not exceed 50
1834        let mut merged = topic.tags.clone();
1835        for (k, v) in &new_tags {
1836            // Update existing or add
1837            if let Some(pos) = merged.iter().position(|(ek, _)| ek == k) {
1838                merged[pos] = (k.clone(), v.clone());
1839            } else {
1840                merged.push((k.clone(), v.clone()));
1841            }
1842        }
1843        if merged.len() > 50 {
1844            return Err(AwsServiceError::aws_error(
1845                StatusCode::BAD_REQUEST,
1846                "TagLimitExceeded",
1847                "Could not complete request: tag quota of per resource exceeded",
1848            ));
1849        }
1850
1851        topic.tags = merged;
1852
1853        Ok(xml_resp(
1854            &format!(
1855                r#"<TagResourceResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
1856  <TagResourceResult/>
1857  <ResponseMetadata>
1858    <RequestId>{}</RequestId>
1859  </ResponseMetadata>
1860</TagResourceResponse>"#,
1861                req.request_id
1862            ),
1863            &req.request_id,
1864        ))
1865    }
1866
1867    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1868        let resource_arn = required(req, "ResourceArn")?;
1869        let tag_keys = parse_tag_keys(req);
1870
1871        let mut state = self.state.write();
1872        let topic = state.topics.get_mut(&resource_arn).ok_or_else(|| {
1873            AwsServiceError::aws_error(
1874                StatusCode::NOT_FOUND,
1875                "ResourceNotFound",
1876                "Resource does not exist",
1877            )
1878        })?;
1879        topic.tags.retain(|(k, _)| !tag_keys.contains(k));
1880
1881        Ok(xml_resp(
1882            &format!(
1883                r#"<UntagResourceResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
1884  <UntagResourceResult/>
1885  <ResponseMetadata>
1886    <RequestId>{}</RequestId>
1887  </ResponseMetadata>
1888</UntagResourceResponse>"#,
1889                req.request_id
1890            ),
1891            &req.request_id,
1892        ))
1893    }
1894
1895    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1896        let resource_arn = required(req, "ResourceArn")?;
1897        let state = self.state.read();
1898        let topic = state.topics.get(&resource_arn).ok_or_else(|| {
1899            AwsServiceError::aws_error(
1900                StatusCode::NOT_FOUND,
1901                "ResourceNotFound",
1902                "Resource does not exist",
1903            )
1904        })?;
1905
1906        let members: String = topic
1907            .tags
1908            .iter()
1909            .map(|(k, v)| format!("      <member><Key>{k}</Key><Value>{v}</Value></member>"))
1910            .collect::<Vec<_>>()
1911            .join("\n");
1912
1913        Ok(xml_resp(
1914            &format!(
1915                r#"<ListTagsForResourceResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
1916  <ListTagsForResourceResult>
1917    <Tags>
1918{members}
1919    </Tags>
1920  </ListTagsForResourceResult>
1921  <ResponseMetadata>
1922    <RequestId>{}</RequestId>
1923  </ResponseMetadata>
1924</ListTagsForResourceResponse>"#,
1925                req.request_id
1926            ),
1927            &req.request_id,
1928        ))
1929    }
1930
1931    fn add_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1932        let topic_arn = required(req, "TopicArn")?;
1933        let label = required(req, "Label")?;
1934
1935        // Parse AWSAccountId.member.N and ActionName.member.N
1936        let mut account_ids = Vec::new();
1937        for n in 1..=20 {
1938            let key = format!("AWSAccountId.member.{n}");
1939            if let Some(v) = req.query_params.get(&key) {
1940                account_ids.push(v.clone());
1941            } else {
1942                break;
1943            }
1944        }
1945
1946        let mut action_names = Vec::new();
1947        for n in 1..=20 {
1948            let key = format!("ActionName.member.{n}");
1949            if let Some(v) = req.query_params.get(&key) {
1950                action_names.push(v.clone());
1951            } else {
1952                break;
1953            }
1954        }
1955
1956        // Validate action names
1957        for action in &action_names {
1958            if !VALID_SNS_ACTIONS.contains(&action.as_str()) {
1959                return Err(AwsServiceError::aws_error(
1960                    StatusCode::BAD_REQUEST,
1961                    "InvalidParameter",
1962                    "Policy statement action out of service scope!",
1963                ));
1964            }
1965        }
1966
1967        let mut state = self.state.write();
1968        let account_id = state.account_id.clone();
1969        let topic = state
1970            .topics
1971            .get_mut(&topic_arn)
1972            .ok_or_else(|| not_found("Topic"))?;
1973
1974        // Get or create policy
1975        let policy_str = topic
1976            .attributes
1977            .get("Policy")
1978            .cloned()
1979            .unwrap_or_else(|| default_policy(&topic_arn, &account_id));
1980
1981        let mut policy: Value = serde_json::from_str(&policy_str)
1982            .or_else(|_| serde_json::from_str(&default_policy(&topic_arn, &account_id)))
1983            .map_err(|_| {
1984                AwsServiceError::aws_error(
1985                    StatusCode::INTERNAL_SERVER_ERROR,
1986                    "InternalError",
1987                    "Failed to parse topic policy",
1988                )
1989            })?;
1990
1991        // Check if statement with this label already exists
1992        if let Some(statements) = policy["Statement"].as_array() {
1993            for stmt in statements {
1994                if stmt["Sid"].as_str() == Some(&label) {
1995                    return Err(AwsServiceError::aws_error(
1996                        StatusCode::BAD_REQUEST,
1997                        "InvalidParameter",
1998                        "Statement already exists",
1999                    ));
2000                }
2001            }
2002        }
2003
2004        // Build principal
2005        let principal = if account_ids.len() == 1 {
2006            Value::String(Arn::global("iam", &account_ids[0], "root").to_string())
2007        } else {
2008            Value::Array(
2009                account_ids
2010                    .iter()
2011                    .map(|id| Value::String(Arn::global("iam", id, "root").to_string()))
2012                    .collect(),
2013            )
2014        };
2015
2016        // Build action
2017        let action = if action_names.len() == 1 {
2018            Value::String(format!("SNS:{}", action_names[0]))
2019        } else {
2020            Value::Array(
2021                action_names
2022                    .iter()
2023                    .map(|a| Value::String(format!("SNS:{}", a)))
2024                    .collect(),
2025            )
2026        };
2027
2028        let new_statement = serde_json::json!({
2029            "Sid": label,
2030            "Effect": "Allow",
2031            "Principal": {"AWS": principal},
2032            "Action": action,
2033            "Resource": topic_arn,
2034        });
2035
2036        if let Some(statements) = policy["Statement"].as_array_mut() {
2037            statements.push(new_statement);
2038        }
2039
2040        topic
2041            .attributes
2042            .insert("Policy".to_string(), policy.to_string());
2043
2044        Ok(xml_resp(
2045            &format!(
2046                r#"<AddPermissionResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2047  <ResponseMetadata>
2048    <RequestId>{}</RequestId>
2049  </ResponseMetadata>
2050</AddPermissionResponse>"#,
2051                req.request_id
2052            ),
2053            &req.request_id,
2054        ))
2055    }
2056
2057    fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2058        let topic_arn = required(req, "TopicArn")?;
2059        let label = required(req, "Label")?;
2060
2061        let mut state = self.state.write();
2062        let topic = state
2063            .topics
2064            .get_mut(&topic_arn)
2065            .ok_or_else(|| not_found("Topic"))?;
2066
2067        if let Some(policy_str) = topic.attributes.get("Policy").cloned() {
2068            if let Ok(mut policy) = serde_json::from_str::<Value>(&policy_str) {
2069                if let Some(statements) = policy["Statement"].as_array_mut() {
2070                    statements.retain(|s| s["Sid"].as_str() != Some(&label));
2071                }
2072                topic
2073                    .attributes
2074                    .insert("Policy".to_string(), policy.to_string());
2075            }
2076        }
2077
2078        Ok(xml_resp(
2079            &format!(
2080                r#"<RemovePermissionResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2081  <ResponseMetadata>
2082    <RequestId>{}</RequestId>
2083  </ResponseMetadata>
2084</RemovePermissionResponse>"#,
2085                req.request_id
2086            ),
2087            &req.request_id,
2088        ))
2089    }
2090
2091    // ===== Platform Application actions =====
2092
2093    fn create_platform_application(
2094        &self,
2095        req: &AwsRequest,
2096    ) -> Result<AwsResponse, AwsServiceError> {
2097        let name = required(req, "Name")?;
2098        let platform = required(req, "Platform")?;
2099        let attributes = parse_entries(req, "Attributes");
2100
2101        let mut state = self.state.write();
2102        let arn = format!(
2103            "arn:aws:sns:{}:{}:app/{}/{}",
2104            req.region, state.account_id, platform, name
2105        );
2106
2107        state.platform_applications.insert(
2108            arn.clone(),
2109            PlatformApplication {
2110                arn: arn.clone(),
2111                name,
2112                platform,
2113                attributes,
2114                endpoints: HashMap::new(),
2115            },
2116        );
2117
2118        Ok(xml_resp(
2119            &format!(
2120                r#"<CreatePlatformApplicationResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2121  <CreatePlatformApplicationResult>
2122    <PlatformApplicationArn>{arn}</PlatformApplicationArn>
2123  </CreatePlatformApplicationResult>
2124  <ResponseMetadata>
2125    <RequestId>{}</RequestId>
2126  </ResponseMetadata>
2127</CreatePlatformApplicationResponse>"#,
2128                req.request_id
2129            ),
2130            &req.request_id,
2131        ))
2132    }
2133
2134    fn delete_platform_application(
2135        &self,
2136        req: &AwsRequest,
2137    ) -> Result<AwsResponse, AwsServiceError> {
2138        let arn = required(req, "PlatformApplicationArn")?;
2139        self.state.write().platform_applications.remove(&arn);
2140
2141        Ok(xml_resp(
2142            &format!(
2143                r#"<DeletePlatformApplicationResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2144  <ResponseMetadata>
2145    <RequestId>{}</RequestId>
2146  </ResponseMetadata>
2147</DeletePlatformApplicationResponse>"#,
2148                req.request_id
2149            ),
2150            &req.request_id,
2151        ))
2152    }
2153
2154    fn get_platform_application_attributes(
2155        &self,
2156        req: &AwsRequest,
2157    ) -> Result<AwsResponse, AwsServiceError> {
2158        let arn = required(req, "PlatformApplicationArn")?;
2159        let state = self.state.read();
2160        let app = state
2161            .platform_applications
2162            .get(&arn)
2163            .ok_or_else(|| not_found("PlatformApplication"))?;
2164
2165        let attrs: String = app
2166            .attributes
2167            .iter()
2168            .map(|(k, v)| format_attr(k, v))
2169            .collect::<Vec<_>>()
2170            .join("\n");
2171
2172        Ok(xml_resp(
2173            &format!(
2174                r#"<GetPlatformApplicationAttributesResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2175  <GetPlatformApplicationAttributesResult>
2176    <Attributes>
2177{attrs}
2178    </Attributes>
2179  </GetPlatformApplicationAttributesResult>
2180  <ResponseMetadata>
2181    <RequestId>{}</RequestId>
2182  </ResponseMetadata>
2183</GetPlatformApplicationAttributesResponse>"#,
2184                req.request_id
2185            ),
2186            &req.request_id,
2187        ))
2188    }
2189
2190    fn set_platform_application_attributes(
2191        &self,
2192        req: &AwsRequest,
2193    ) -> Result<AwsResponse, AwsServiceError> {
2194        let arn = required(req, "PlatformApplicationArn")?;
2195        let new_attrs = parse_entries(req, "Attributes");
2196
2197        let mut state = self.state.write();
2198        let app = state
2199            .platform_applications
2200            .get_mut(&arn)
2201            .ok_or_else(|| not_found("PlatformApplication"))?;
2202
2203        for (k, v) in new_attrs {
2204            app.attributes.insert(k, v);
2205        }
2206
2207        Ok(xml_resp(
2208            &format!(
2209                r#"<SetPlatformApplicationAttributesResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2210  <ResponseMetadata>
2211    <RequestId>{}</RequestId>
2212  </ResponseMetadata>
2213</SetPlatformApplicationAttributesResponse>"#,
2214                req.request_id
2215            ),
2216            &req.request_id,
2217        ))
2218    }
2219
2220    fn list_platform_applications(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2221        let state = self.state.read();
2222
2223        let members: String = state
2224            .platform_applications
2225            .values()
2226            .map(|app| {
2227                let attrs: String = app
2228                    .attributes
2229                    .iter()
2230                    .map(|(k, v)| format_attr(k, v))
2231                    .collect::<Vec<_>>()
2232                    .join("\n");
2233                format!(
2234                    r#"      <member>
2235        <PlatformApplicationArn>{}</PlatformApplicationArn>
2236        <Attributes>
2237{attrs}
2238        </Attributes>
2239      </member>"#,
2240                    app.arn
2241                )
2242            })
2243            .collect::<Vec<_>>()
2244            .join("\n");
2245
2246        Ok(xml_resp(
2247            &format!(
2248                r#"<ListPlatformApplicationsResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2249  <ListPlatformApplicationsResult>
2250    <PlatformApplications>
2251{members}
2252    </PlatformApplications>
2253  </ListPlatformApplicationsResult>
2254  <ResponseMetadata>
2255    <RequestId>{}</RequestId>
2256  </ResponseMetadata>
2257</ListPlatformApplicationsResponse>"#,
2258                req.request_id
2259            ),
2260            &req.request_id,
2261        ))
2262    }
2263
2264    fn create_platform_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2265        let app_arn = required(req, "PlatformApplicationArn")?;
2266        let token = required(req, "Token")?;
2267        let custom_user_data = param(req, "CustomUserData");
2268        let attrs = parse_entries(req, "Attributes");
2269
2270        let mut state = self.state.write();
2271        let account_id = state.account_id.clone();
2272        let app = state
2273            .platform_applications
2274            .get_mut(&app_arn)
2275            .ok_or_else(|| not_found("PlatformApplication"))?;
2276
2277        // Check for existing endpoint with same token
2278        for (arn, ep) in &app.endpoints {
2279            if ep.token == token {
2280                // If attributes are different, check Enabled attribute
2281                let existing_enabled = ep
2282                    .attributes
2283                    .get("Enabled")
2284                    .cloned()
2285                    .unwrap_or_else(|| "true".to_string());
2286                let new_enabled = attrs
2287                    .get("Enabled")
2288                    .cloned()
2289                    .unwrap_or_else(|| "true".to_string());
2290                let custom_matches = match (&custom_user_data, ep.attributes.get("CustomUserData"))
2291                {
2292                    (Some(new), Some(old)) => new == old,
2293                    (None, None) => true,
2294                    (None, Some(_)) => true,
2295                    _ => false,
2296                };
2297
2298                if existing_enabled == new_enabled && custom_matches {
2299                    // Return existing endpoint
2300                    return Ok(xml_resp(
2301                        &format!(
2302                            r#"<CreatePlatformEndpointResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2303  <CreatePlatformEndpointResult>
2304    <EndpointArn>{arn}</EndpointArn>
2305  </CreatePlatformEndpointResult>
2306  <ResponseMetadata>
2307    <RequestId>{}</RequestId>
2308  </ResponseMetadata>
2309</CreatePlatformEndpointResponse>"#,
2310                            req.request_id
2311                        ),
2312                        &req.request_id,
2313                    ));
2314                } else {
2315                    return Err(AwsServiceError::aws_error(
2316                        StatusCode::BAD_REQUEST,
2317                        "InvalidParameter",
2318                        format!("Invalid parameter: Token Reason: Endpoint {} already exists with the same Token, but different attributes.", arn),
2319                    ));
2320                }
2321            }
2322        }
2323
2324        let endpoint_id = uuid::Uuid::new_v4().to_string().replace('-', "");
2325        let endpoint_arn = format!(
2326            "arn:aws:sns:{}:{}:endpoint/{}/{}/{}",
2327            req.region, account_id, app.platform, app.name, endpoint_id
2328        );
2329
2330        let mut endpoint_attrs = attrs;
2331        endpoint_attrs
2332            .entry("Enabled".to_string())
2333            .or_insert_with(|| "true".to_string());
2334        endpoint_attrs.insert("Token".to_string(), token.clone());
2335        if let Some(ref ud) = custom_user_data {
2336            endpoint_attrs
2337                .entry("CustomUserData".to_string())
2338                .or_insert_with(|| ud.clone());
2339        }
2340
2341        let enabled = endpoint_attrs
2342            .get("Enabled")
2343            .map(|v| v == "true")
2344            .unwrap_or(true);
2345
2346        app.endpoints.insert(
2347            endpoint_arn.clone(),
2348            PlatformEndpoint {
2349                arn: endpoint_arn.clone(),
2350                token,
2351                attributes: endpoint_attrs,
2352                enabled,
2353                messages: Vec::new(),
2354            },
2355        );
2356
2357        Ok(xml_resp(
2358            &format!(
2359                r#"<CreatePlatformEndpointResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2360  <CreatePlatformEndpointResult>
2361    <EndpointArn>{endpoint_arn}</EndpointArn>
2362  </CreatePlatformEndpointResult>
2363  <ResponseMetadata>
2364    <RequestId>{}</RequestId>
2365  </ResponseMetadata>
2366</CreatePlatformEndpointResponse>"#,
2367                req.request_id
2368            ),
2369            &req.request_id,
2370        ))
2371    }
2372
2373    fn delete_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2374        let endpoint_arn = required(req, "EndpointArn")?;
2375
2376        let mut state = self.state.write();
2377        for app in state.platform_applications.values_mut() {
2378            app.endpoints.remove(&endpoint_arn);
2379        }
2380
2381        Ok(xml_resp(
2382            &format!(
2383                r#"<DeleteEndpointResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2384  <ResponseMetadata>
2385    <RequestId>{}</RequestId>
2386  </ResponseMetadata>
2387</DeleteEndpointResponse>"#,
2388                req.request_id
2389            ),
2390            &req.request_id,
2391        ))
2392    }
2393
2394    fn get_endpoint_attributes(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2395        let endpoint_arn = required(req, "EndpointArn")?;
2396
2397        let state = self.state.read();
2398        for app in state.platform_applications.values() {
2399            if let Some(ep) = app.endpoints.get(&endpoint_arn) {
2400                let attrs: String = ep
2401                    .attributes
2402                    .iter()
2403                    .map(|(k, v)| format_attr(k, v))
2404                    .collect::<Vec<_>>()
2405                    .join("\n");
2406
2407                return Ok(xml_resp(
2408                    &format!(
2409                        r#"<GetEndpointAttributesResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2410  <GetEndpointAttributesResult>
2411    <Attributes>
2412{attrs}
2413    </Attributes>
2414  </GetEndpointAttributesResult>
2415  <ResponseMetadata>
2416    <RequestId>{}</RequestId>
2417  </ResponseMetadata>
2418</GetEndpointAttributesResponse>"#,
2419                        req.request_id
2420                    ),
2421                    &req.request_id,
2422                ));
2423            }
2424        }
2425
2426        Err(not_found("Endpoint"))
2427    }
2428
2429    fn set_endpoint_attributes(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2430        let endpoint_arn = required(req, "EndpointArn")?;
2431        let new_attrs = parse_entries(req, "Attributes");
2432
2433        let mut state = self.state.write();
2434        for app in state.platform_applications.values_mut() {
2435            if let Some(ep) = app.endpoints.get_mut(&endpoint_arn) {
2436                for (k, v) in new_attrs {
2437                    if k == "Enabled" {
2438                        ep.enabled = v == "true";
2439                    }
2440                    ep.attributes.insert(k, v);
2441                }
2442
2443                return Ok(xml_resp(
2444                    &format!(
2445                        r#"<SetEndpointAttributesResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2446  <ResponseMetadata>
2447    <RequestId>{}</RequestId>
2448  </ResponseMetadata>
2449</SetEndpointAttributesResponse>"#,
2450                        req.request_id
2451                    ),
2452                    &req.request_id,
2453                ));
2454            }
2455        }
2456
2457        Err(not_found("Endpoint"))
2458    }
2459
2460    fn list_endpoints_by_platform_application(
2461        &self,
2462        req: &AwsRequest,
2463    ) -> Result<AwsResponse, AwsServiceError> {
2464        let app_arn = required(req, "PlatformApplicationArn")?;
2465
2466        let state = self.state.read();
2467        let app = state
2468            .platform_applications
2469            .get(&app_arn)
2470            .ok_or_else(|| not_found("PlatformApplication"))?;
2471
2472        let members: String = app
2473            .endpoints
2474            .values()
2475            .map(|ep| {
2476                let attrs: String = ep
2477                    .attributes
2478                    .iter()
2479                    .map(|(k, v)| format_attr(k, v))
2480                    .collect::<Vec<_>>()
2481                    .join("\n");
2482                format!(
2483                    r#"      <member>
2484        <EndpointArn>{}</EndpointArn>
2485        <Attributes>
2486{attrs}
2487        </Attributes>
2488      </member>"#,
2489                    ep.arn
2490                )
2491            })
2492            .collect::<Vec<_>>()
2493            .join("\n");
2494
2495        Ok(xml_resp(
2496            &format!(
2497                r#"<ListEndpointsByPlatformApplicationResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2498  <ListEndpointsByPlatformApplicationResult>
2499    <Endpoints>
2500{members}
2501    </Endpoints>
2502  </ListEndpointsByPlatformApplicationResult>
2503  <ResponseMetadata>
2504    <RequestId>{}</RequestId>
2505  </ResponseMetadata>
2506</ListEndpointsByPlatformApplicationResponse>"#,
2507                req.request_id
2508            ),
2509            &req.request_id,
2510        ))
2511    }
2512
2513    // ===== SMS actions =====
2514
2515    fn set_sms_attributes(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2516        let attrs = parse_entries(req, "attributes");
2517
2518        let mut state = self.state.write();
2519        for (k, v) in attrs {
2520            state.sms_attributes.insert(k, v);
2521        }
2522
2523        Ok(xml_resp(
2524            &format!(
2525                r#"<SetSMSAttributesResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2526  <SetSMSAttributesResult/>
2527  <ResponseMetadata>
2528    <RequestId>{}</RequestId>
2529  </ResponseMetadata>
2530</SetSMSAttributesResponse>"#,
2531                req.request_id
2532            ),
2533            &req.request_id,
2534        ))
2535    }
2536
2537    fn get_sms_attributes(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2538        // Parse optional attribute name filter: attributes.member.N
2539        let mut filter_names = Vec::new();
2540        for n in 1..=50 {
2541            let key = format!("attributes.member.{n}");
2542            if let Some(name) = req.query_params.get(&key) {
2543                filter_names.push(name.clone());
2544            } else {
2545                break;
2546            }
2547        }
2548
2549        let state = self.state.read();
2550
2551        let attrs: String = state
2552            .sms_attributes
2553            .iter()
2554            .filter(|(k, _)| filter_names.is_empty() || filter_names.contains(k))
2555            .map(|(k, v)| format_attr(k, v))
2556            .collect::<Vec<_>>()
2557            .join("\n");
2558
2559        Ok(xml_resp(
2560            &format!(
2561                r#"<GetSMSAttributesResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2562  <GetSMSAttributesResult>
2563    <attributes>
2564{attrs}
2565    </attributes>
2566  </GetSMSAttributesResult>
2567  <ResponseMetadata>
2568    <RequestId>{}</RequestId>
2569  </ResponseMetadata>
2570</GetSMSAttributesResponse>"#,
2571                req.request_id
2572            ),
2573            &req.request_id,
2574        ))
2575    }
2576
2577    fn check_if_phone_number_is_opted_out(
2578        &self,
2579        req: &AwsRequest,
2580    ) -> Result<AwsResponse, AwsServiceError> {
2581        let phone_number = required(req, "phoneNumber")?;
2582
2583        // Validate phone number format (E.164)
2584        let valid = phone_number.starts_with('+')
2585            && phone_number.len() >= 2
2586            && phone_number[1..].chars().all(|c| c.is_ascii_digit());
2587        if !valid {
2588            return Err(AwsServiceError::aws_error(
2589                StatusCode::BAD_REQUEST,
2590                "InvalidParameter",
2591                format!(
2592                    "Invalid parameter: PhoneNumber Reason: {phone_number} does not meet the E164 format"
2593                ),
2594            ));
2595        }
2596
2597        let state = self.state.read();
2598        // Numbers ending in 99 are considered opted out by convention
2599        let is_opted_out =
2600            state.opted_out_numbers.contains(&phone_number) || phone_number.ends_with("99");
2601
2602        Ok(xml_resp(
2603            &format!(
2604                r#"<CheckIfPhoneNumberIsOptedOutResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2605  <CheckIfPhoneNumberIsOptedOutResult>
2606    <isOptedOut>{is_opted_out}</isOptedOut>
2607  </CheckIfPhoneNumberIsOptedOutResult>
2608  <ResponseMetadata>
2609    <RequestId>{}</RequestId>
2610  </ResponseMetadata>
2611</CheckIfPhoneNumberIsOptedOutResponse>"#,
2612                req.request_id
2613            ),
2614            &req.request_id,
2615        ))
2616    }
2617
2618    fn list_phone_numbers_opted_out(
2619        &self,
2620        req: &AwsRequest,
2621    ) -> Result<AwsResponse, AwsServiceError> {
2622        let state = self.state.read();
2623        let members: String = state
2624            .opted_out_numbers
2625            .iter()
2626            .map(|n| format!("      <member>{n}</member>"))
2627            .collect::<Vec<_>>()
2628            .join("\n");
2629
2630        Ok(xml_resp(
2631            &format!(
2632                r#"<ListPhoneNumbersOptedOutResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2633  <ListPhoneNumbersOptedOutResult>
2634    <phoneNumbers>
2635{members}
2636    </phoneNumbers>
2637  </ListPhoneNumbersOptedOutResult>
2638  <ResponseMetadata>
2639    <RequestId>{}</RequestId>
2640  </ResponseMetadata>
2641</ListPhoneNumbersOptedOutResponse>"#,
2642                req.request_id
2643            ),
2644            &req.request_id,
2645        ))
2646    }
2647
2648    fn opt_in_phone_number(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2649        let phone_number = required(req, "phoneNumber")?;
2650        let mut state = self.state.write();
2651        state.opted_out_numbers.retain(|n| n != &phone_number);
2652
2653        Ok(xml_resp(
2654            &format!(
2655                r#"<OptInPhoneNumberResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/">
2656  <OptInPhoneNumberResult/>
2657  <ResponseMetadata>
2658    <RequestId>{}</RequestId>
2659  </ResponseMetadata>
2660</OptInPhoneNumberResponse>"#,
2661                req.request_id
2662            ),
2663            &req.request_id,
2664        ))
2665    }
2666}
2667
2668/// Inputs to `build_sns_lambda_event` — one SNS message delivered to one Lambda subscription.
2669pub(crate) struct SnsLambdaEventInput<'a> {
2670    pub message_id: &'a str,
2671    pub topic_arn: &'a str,
2672    pub subscription_arn: &'a str,
2673    pub message: &'a str,
2674    pub subject: Option<&'a str>,
2675    pub message_attributes: &'a serde_json::Map<String, Value>,
2676    pub timestamp: &'a chrono::DateTime<Utc>,
2677    pub endpoint: &'a str,
2678}
2679
2680/// Build an SNS Lambda event payload (matches real AWS format).
2681/// Used by both direct Publish and cross-service delivery.
2682pub(crate) fn build_sns_lambda_event(input: &SnsLambdaEventInput<'_>) -> String {
2683    let sns_event = serde_json::json!({
2684        "Records": [{
2685            "EventVersion": "1.0",
2686            "EventSubscriptionArn": input.subscription_arn,
2687            "EventSource": "aws:sns",
2688            "Sns": {
2689                "SignatureVersion": "1",
2690                "Timestamp": input.timestamp.to_rfc3339(),
2691                "Signature": "FAKE_SIGNATURE",
2692                "SigningCertUrl": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-0000000000000000000000.pem",
2693                "MessageId": input.message_id,
2694                "Message": input.message,
2695                "MessageAttributes": input.message_attributes,
2696                "Type": "Notification",
2697                "UnsubscribeUrl": format!("{}/?Action=Unsubscribe&SubscriptionArn={}", input.endpoint, input.subscription_arn),
2698                "TopicArn": input.topic_arn,
2699                "Subject": input.subject.unwrap_or(""),
2700            }
2701        }]
2702    });
2703    sns_event.to_string()
2704}
2705
2706/// Build an SNS notification envelope as JSON string.
2707/// Subject and MessageAttributes are only included when present.
2708fn build_sns_envelope(
2709    message_id: &str,
2710    topic_arn: &str,
2711    subject: &Option<String>,
2712    message: &str,
2713    message_attributes: &serde_json::Map<String, Value>,
2714    endpoint: &str,
2715) -> String {
2716    let mut map = serde_json::Map::new();
2717    map.insert(
2718        "Type".to_string(),
2719        Value::String("Notification".to_string()),
2720    );
2721    map.insert(
2722        "MessageId".to_string(),
2723        Value::String(message_id.to_string()),
2724    );
2725    map.insert("TopicArn".to_string(), Value::String(topic_arn.to_string()));
2726    if let Some(ref subj) = subject {
2727        map.insert("Subject".to_string(), Value::String(subj.clone()));
2728    }
2729    map.insert("Message".to_string(), Value::String(message.to_string()));
2730    map.insert(
2731        "Timestamp".to_string(),
2732        Value::String(Utc::now().to_rfc3339()),
2733    );
2734    map.insert(
2735        "SignatureVersion".to_string(),
2736        Value::String("1".to_string()),
2737    );
2738    map.insert(
2739        "Signature".to_string(),
2740        Value::String("FAKE_SIGNATURE".to_string()),
2741    );
2742    map.insert(
2743        "SigningCertURL".to_string(),
2744        Value::String("https://sns.us-east-1.amazonaws.com/SimpleNotificationService-0000000000000000000000.pem".to_string()),
2745    );
2746    map.insert(
2747        "UnsubscribeURL".to_string(),
2748        Value::String(format!(
2749            "{}/?Action=Unsubscribe&SubscriptionArn={}",
2750            endpoint, topic_arn
2751        )),
2752    );
2753    if !message_attributes.is_empty() {
2754        map.insert(
2755            "MessageAttributes".to_string(),
2756            Value::Object(message_attributes.clone()),
2757        );
2758    }
2759    Value::Object(map).to_string()
2760}
2761
2762fn format_attr(name: &str, value: &str) -> String {
2763    format!("      <entry><key>{name}</key><value>{value}</value></entry>")
2764}
2765
2766fn format_sub_member(sub: &SnsSubscription) -> String {
2767    let display_arn = if sub.confirmed {
2768        &sub.subscription_arn
2769    } else {
2770        "PendingConfirmation"
2771    };
2772    format!(
2773        r#"      <member>
2774        <SubscriptionArn>{}</SubscriptionArn>
2775        <TopicArn>{}</TopicArn>
2776        <Protocol>{}</Protocol>
2777        <Endpoint>{}</Endpoint>
2778        <Owner>{}</Owner>
2779      </member>"#,
2780        display_arn, sub.topic_arn, sub.protocol, sub.endpoint, sub.owner,
2781    )
2782}
2783
2784/// Parse MessageAttributes from query params.
2785/// Format: MessageAttributes.entry.N.Name, MessageAttributes.entry.N.Value.DataType,
2786///         MessageAttributes.entry.N.Value.StringValue
2787/// Subscribers of a topic, grouped by protocol. Returned by
2788/// `collect_topic_subscribers` so the fan-out loop doesn't have to
2789/// re-filter the subscriptions map five times inline.
2790struct TopicSubscribers {
2791    /// (queue_arn, raw_message_delivery)
2792    sqs: Vec<(String, bool)>,
2793    http: Vec<String>,
2794    /// (function_arn, subscription_arn)
2795    lambda: Vec<(String, String)>,
2796    email: Vec<String>,
2797    sms: Vec<String>,
2798}
2799
2800/// Read-only state passed down the fan-out helpers so each helper has
2801/// the same data the monolithic publish() used to reference inline.
2802struct TopicFanoutContext<'a> {
2803    msg_id: &'a str,
2804    topic_arn: &'a str,
2805    subject: Option<&'a str>,
2806    endpoint: &'a str,
2807    sqs_message: &'a str,
2808    default_message: &'a str,
2809    envelope_attrs: &'a serde_json::Map<String, Value>,
2810    message_attributes: &'a HashMap<String, MessageAttribute>,
2811    message_group_id: Option<&'a str>,
2812    message_dedup_id: Option<&'a str>,
2813}
2814
2815fn collect_topic_subscribers(
2816    state: &crate::state::SnsState,
2817    topic_arn: &str,
2818    message_attributes: &HashMap<String, MessageAttribute>,
2819    message: &str,
2820) -> TopicSubscribers {
2821    let confirmed_for_topic = |s: &&SnsSubscription| {
2822        s.topic_arn == topic_arn
2823            && s.confirmed
2824            && matches_filter_policy(s, message_attributes, message)
2825    };
2826
2827    let sqs = state
2828        .subscriptions
2829        .values()
2830        .filter(|s| s.protocol == "sqs")
2831        .filter(confirmed_for_topic)
2832        .map(|s| {
2833            let raw = s
2834                .attributes
2835                .get("RawMessageDelivery")
2836                .map(|v| v == "true")
2837                .unwrap_or(false);
2838            (s.endpoint.clone(), raw)
2839        })
2840        .collect();
2841
2842    let http = state
2843        .subscriptions
2844        .values()
2845        .filter(|s| s.protocol == "http" || s.protocol == "https")
2846        .filter(confirmed_for_topic)
2847        .map(|s| s.endpoint.clone())
2848        .collect();
2849
2850    let lambda = state
2851        .subscriptions
2852        .values()
2853        .filter(|s| s.protocol == "lambda")
2854        .filter(confirmed_for_topic)
2855        .map(|s| (s.endpoint.clone(), s.subscription_arn.clone()))
2856        .collect();
2857
2858    let email = state
2859        .subscriptions
2860        .values()
2861        .filter(|s| s.protocol == "email" || s.protocol == "email-json")
2862        .filter(confirmed_for_topic)
2863        .map(|s| s.endpoint.clone())
2864        .collect();
2865
2866    let sms = state
2867        .subscriptions
2868        .values()
2869        .filter(|s| s.protocol == "sms")
2870        .filter(confirmed_for_topic)
2871        .map(|s| s.endpoint.clone())
2872        .collect();
2873
2874    TopicSubscribers {
2875        sqs,
2876        http,
2877        lambda,
2878        email,
2879        sms,
2880    }
2881}
2882
2883/// Build the `MessageAttributes` object used inside an SNS notification
2884/// envelope from the typed SNS message attributes.
2885fn build_envelope_attrs(
2886    message_attributes: &HashMap<String, MessageAttribute>,
2887) -> serde_json::Map<String, Value> {
2888    let mut envelope_attrs = serde_json::Map::new();
2889    for (key, attr) in message_attributes {
2890        let mut attr_obj = serde_json::Map::new();
2891        attr_obj.insert("Type".to_string(), Value::String(attr.data_type.clone()));
2892        if let Some(ref sv) = attr.string_value {
2893            attr_obj.insert("Value".to_string(), Value::String(sv.clone()));
2894        }
2895        if let Some(ref bv) = attr.binary_value {
2896            attr_obj.insert(
2897                "Value".to_string(),
2898                Value::String(base64::engine::general_purpose::STANDARD.encode(bv)),
2899            );
2900        }
2901        envelope_attrs.insert(key.clone(), Value::Object(attr_obj));
2902    }
2903    envelope_attrs
2904}
2905
2906fn parse_message_attributes(req: &AwsRequest) -> HashMap<String, MessageAttribute> {
2907    let mut attrs = HashMap::new();
2908    for n in 1..=10 {
2909        let name_key = format!("MessageAttributes.entry.{n}.Name");
2910        let data_type_key = format!("MessageAttributes.entry.{n}.Value.DataType");
2911        if let (Some(name), Some(data_type)) = (
2912            req.query_params.get(&name_key),
2913            req.query_params.get(&data_type_key),
2914        ) {
2915            let string_value_key = format!("MessageAttributes.entry.{n}.Value.StringValue");
2916            let string_value = req.query_params.get(&string_value_key).cloned();
2917            let binary_value_key = format!("MessageAttributes.entry.{n}.Value.BinaryValue");
2918            let binary_value = req
2919                .query_params
2920                .get(&binary_value_key)
2921                .and_then(|b| base64::engine::general_purpose::STANDARD.decode(b).ok());
2922            attrs.insert(
2923                name.clone(),
2924                MessageAttribute {
2925                    data_type: data_type.clone(),
2926                    string_value,
2927                    binary_value,
2928                },
2929            );
2930        } else {
2931            break;
2932        }
2933    }
2934    attrs
2935}
2936
2937/// Parse MessageAttributes for a specific PublishBatch entry.
2938/// Format: PublishBatchRequestEntries.member.M.MessageAttributes.entry.N.Name/...
2939fn parse_batch_message_attributes(
2940    req: &AwsRequest,
2941    member_idx: usize,
2942) -> HashMap<String, MessageAttribute> {
2943    let mut attrs = HashMap::new();
2944    for n in 1..=10 {
2945        let prefix =
2946            format!("PublishBatchRequestEntries.member.{member_idx}.MessageAttributes.entry.{n}");
2947        let name_key = format!("{prefix}.Name");
2948        let data_type_key = format!("{prefix}.Value.DataType");
2949        if let (Some(name), Some(data_type)) = (
2950            req.query_params.get(&name_key),
2951            req.query_params.get(&data_type_key),
2952        ) {
2953            let sv_key = format!("{prefix}.Value.StringValue");
2954            let string_value = req.query_params.get(&sv_key).cloned();
2955            let bv_key = format!("{prefix}.Value.BinaryValue");
2956            let binary_value = req
2957                .query_params
2958                .get(&bv_key)
2959                .and_then(|b| base64::engine::general_purpose::STANDARD.decode(b).ok());
2960            attrs.insert(
2961                name.clone(),
2962                MessageAttribute {
2963                    data_type: data_type.clone(),
2964                    string_value,
2965                    binary_value,
2966                },
2967            );
2968        } else {
2969            break;
2970        }
2971    }
2972    attrs
2973}
2974
2975/// Parse tags from query params.
2976/// Format: Tags.member.N.Key / Tags.member.N.Value
2977fn parse_tags(req: &AwsRequest) -> Vec<(String, String)> {
2978    let mut tags = Vec::new();
2979    for n in 1..=100 {
2980        let key_param = format!("Tags.member.{n}.Key");
2981        let val_param = format!("Tags.member.{n}.Value");
2982        if let Some(key) = req.query_params.get(&key_param) {
2983            let value = req
2984                .query_params
2985                .get(&val_param)
2986                .cloned()
2987                .unwrap_or_default();
2988            tags.push((key.clone(), value));
2989        } else {
2990            break;
2991        }
2992    }
2993    tags
2994}
2995
2996/// Parse tag keys for UntagResource.
2997/// Format: TagKeys.member.N
2998fn parse_tag_keys(req: &AwsRequest) -> Vec<String> {
2999    let mut keys = Vec::new();
3000    for n in 1..=50 {
3001        let key_param = format!("TagKeys.member.{n}");
3002        if let Some(key) = req.query_params.get(&key_param) {
3003            keys.push(key.clone());
3004        } else {
3005            break;
3006        }
3007    }
3008    keys
3009}
3010
3011/// Parse Attributes.entry.N.key/value pairs (used by CreateTopic, Subscribe, etc.)
3012fn parse_entries(req: &AwsRequest, prefix: &str) -> HashMap<String, String> {
3013    let mut attrs = HashMap::new();
3014    for n in 1..=50 {
3015        let key_param = format!("{prefix}.entry.{n}.key");
3016        let val_param = format!("{prefix}.entry.{n}.value");
3017        if let Some(key) = req.query_params.get(&key_param) {
3018            let value = req
3019                .query_params
3020                .get(&val_param)
3021                .cloned()
3022                .unwrap_or_default();
3023            attrs.insert(key.clone(), value);
3024        } else {
3025            break;
3026        }
3027    }
3028    attrs
3029}
3030
3031/// Validate SMS phone number
3032fn validate_sms_endpoint(endpoint: &str) -> Result<(), AwsServiceError> {
3033    // Allow formats like +15551234567 and +15/55-123.4567
3034    if endpoint.is_empty() {
3035        return Err(AwsServiceError::aws_error(
3036            StatusCode::BAD_REQUEST,
3037            "InvalidParameter",
3038            "Invalid parameter: Endpoint",
3039        ));
3040    }
3041
3042    // Must start with optional + and contain only digits, -, /, .
3043    let stripped = endpoint.strip_prefix('+').unwrap_or(endpoint);
3044    if stripped.is_empty() {
3045        return Err(AwsServiceError::aws_error(
3046            StatusCode::BAD_REQUEST,
3047            "InvalidParameter",
3048            format!("Invalid SMS endpoint: {endpoint}"),
3049        ));
3050    }
3051
3052    // Check for invalid patterns: consecutive special chars, must start with + or digit
3053    if !endpoint.starts_with('+') && !endpoint.starts_with(|c: char| c.is_ascii_digit()) {
3054        return Err(AwsServiceError::aws_error(
3055            StatusCode::BAD_REQUEST,
3056            "InvalidParameter",
3057            format!("Invalid SMS endpoint: {endpoint}"),
3058        ));
3059    }
3060
3061    // Must not end with a special char
3062    if endpoint.ends_with('.') || endpoint.ends_with('-') || endpoint.ends_with('/') {
3063        return Err(AwsServiceError::aws_error(
3064            StatusCode::BAD_REQUEST,
3065            "InvalidParameter",
3066            format!("Invalid SMS endpoint: {endpoint}"),
3067        ));
3068    }
3069
3070    // Must not have consecutive special chars like --
3071    let chars: Vec<char> = endpoint.chars().collect();
3072    for i in 0..chars.len() - 1 {
3073        let c = chars[i];
3074        let next = chars[i + 1];
3075        if (c == '-' || c == '/' || c == '.') && (next == '-' || next == '/' || next == '.') {
3076            return Err(AwsServiceError::aws_error(
3077                StatusCode::BAD_REQUEST,
3078                "InvalidParameter",
3079                format!("Invalid SMS endpoint: {endpoint}"),
3080            ));
3081        }
3082    }
3083
3084    // Check all chars are valid
3085    for c in stripped.chars() {
3086        if !c.is_ascii_digit() && c != '-' && c != '/' && c != '.' {
3087            return Err(AwsServiceError::aws_error(
3088                StatusCode::BAD_REQUEST,
3089                "InvalidParameter",
3090                format!("Invalid SMS endpoint: {endpoint}"),
3091            ));
3092        }
3093    }
3094
3095    Ok(())
3096}
3097
3098/// Check if a message's attributes match the subscription's FilterPolicy.
3099fn matches_filter_policy(
3100    sub: &SnsSubscription,
3101    message_attributes: &HashMap<String, MessageAttribute>,
3102    message_body: &str,
3103) -> bool {
3104    let filter_json = match sub.attributes.get("FilterPolicy") {
3105        Some(fp) if !fp.is_empty() => fp,
3106        _ => return true,
3107    };
3108
3109    let filter: HashMap<String, Value> = match serde_json::from_str(filter_json) {
3110        Ok(f) => f,
3111        Err(_) => return false,
3112    };
3113
3114    let scope = sub
3115        .attributes
3116        .get("FilterPolicyScope")
3117        .map(|s| s.as_str())
3118        .unwrap_or("MessageAttributes");
3119
3120    if scope == "MessageBody" {
3121        return matches_filter_policy_body(&filter, message_body);
3122    }
3123
3124    // MessageAttributes scope
3125    for (attr_name, allowed_values) in &filter {
3126        // Handle $or operator
3127        if attr_name == "$or" {
3128            if let Some(or_conditions) = allowed_values.as_array() {
3129                let any_match = or_conditions.iter().any(|condition| {
3130                    if let Some(cond_obj) = condition.as_object() {
3131                        let cond_map: HashMap<String, Value> = cond_obj
3132                            .iter()
3133                            .map(|(k, v)| (k.clone(), v.clone()))
3134                            .collect();
3135                        // Each condition in $or is a mini filter policy
3136                        cond_map.iter().all(|(key, vals)| {
3137                            if let Some(arr) = vals.as_array() {
3138                                if let Some(msg_attr) = message_attributes.get(key) {
3139                                    let val = msg_attr.string_value.as_deref().unwrap_or("");
3140                                    check_filter_values(arr, val)
3141                                } else {
3142                                    false
3143                                }
3144                            } else {
3145                                false
3146                            }
3147                        })
3148                    } else {
3149                        false
3150                    }
3151                });
3152                if !any_match {
3153                    return false;
3154                }
3155                continue;
3156            }
3157        }
3158
3159        let allowed = match allowed_values.as_array() {
3160            Some(arr) => arr,
3161            None => continue,
3162        };
3163
3164        let msg_attr = match message_attributes.get(attr_name) {
3165            Some(a) => a,
3166            None => {
3167                let has_exists_false = allowed.iter().any(|v| {
3168                    v.as_object()
3169                        .and_then(|o| o.get("exists"))
3170                        .and_then(|e| e.as_bool())
3171                        == Some(false)
3172                });
3173                if has_exists_false {
3174                    continue;
3175                }
3176                return false;
3177            }
3178        };
3179
3180        let attr_value = msg_attr.string_value.as_deref().unwrap_or("");
3181        let is_numeric_type = msg_attr.data_type == "Number";
3182
3183        // Handle String.Array data type: parse the JSON array and check each element
3184        if msg_attr.data_type.starts_with("String.Array") || msg_attr.data_type == "String.Array" {
3185            if let Ok(arr) = serde_json::from_str::<Vec<Value>>(attr_value) {
3186                let any_match = arr.iter().any(|elem| {
3187                    let elem_str = match elem {
3188                        Value::String(s) => s.clone(),
3189                        Value::Number(n) => n.to_string(),
3190                        _ => elem.to_string(),
3191                    };
3192                    check_filter_values(allowed, &elem_str)
3193                });
3194                if !any_match {
3195                    return false;
3196                }
3197                continue;
3198            }
3199        }
3200
3201        let matched = check_filter_values_typed(allowed, attr_value, Some(is_numeric_type));
3202        if !matched {
3203            return false;
3204        }
3205    }
3206
3207    true
3208}
3209
3210/// Match filter policy against message body (JSON)
3211fn matches_filter_policy_body(filter: &HashMap<String, Value>, message_body: &str) -> bool {
3212    let body: Value = match serde_json::from_str(message_body) {
3213        Ok(v) => v,
3214        Err(_) => return false,
3215    };
3216
3217    matches_filter_policy_nested(filter, &body)
3218}
3219
3220fn matches_filter_policy_nested(filter: &HashMap<String, Value>, body: &Value) -> bool {
3221    let body_obj = match body.as_object() {
3222        Some(o) => o,
3223        None => return false,
3224    };
3225
3226    for (key, filter_value) in filter {
3227        let body_value = match body_obj.get(key) {
3228            Some(v) => v,
3229            None => {
3230                // Check for exists: false
3231                if let Some(arr) = filter_value.as_array() {
3232                    let has_exists_false = arr.iter().any(|v| {
3233                        v.as_object()
3234                            .and_then(|o| o.get("exists"))
3235                            .and_then(|e| e.as_bool())
3236                            == Some(false)
3237                    });
3238                    if has_exists_false {
3239                        continue;
3240                    }
3241                }
3242                return false;
3243            }
3244        };
3245
3246        if let Some(arr) = filter_value.as_array() {
3247            // This is a leaf filter: check the value
3248            // If the body value is an array, check if ANY element matches
3249            if let Some(body_arr) = body_value.as_array() {
3250                let any_match = body_arr.iter().any(|elem| {
3251                    let is_elem_numeric = elem.is_number();
3252                    let elem_str = match elem {
3253                        Value::String(s) => s.clone(),
3254                        Value::Number(n) => n.to_string(),
3255                        Value::Bool(b) => b.to_string(),
3256                        Value::Null => "null".to_string(),
3257                        _ => elem.to_string(),
3258                    };
3259                    check_filter_values_typed(arr, &elem_str, Some(is_elem_numeric))
3260                });
3261                if !any_match {
3262                    return false;
3263                }
3264            } else {
3265                let is_body_numeric = body_value.is_number();
3266                let value_str = match body_value {
3267                    Value::String(s) => s.clone(),
3268                    Value::Number(n) => n.to_string(),
3269                    Value::Bool(b) => b.to_string(),
3270                    Value::Null => "null".to_string(),
3271                    _ => body_value.to_string(),
3272                };
3273                if !check_filter_values_typed(arr, &value_str, Some(is_body_numeric)) {
3274                    return false;
3275                }
3276            }
3277        } else if let Some(nested_filter) = filter_value.as_object() {
3278            // Nested filter: recurse
3279            let nested_map: HashMap<String, Value> = nested_filter
3280                .iter()
3281                .map(|(k, v)| (k.clone(), v.clone()))
3282                .collect();
3283            // If body_value is an array, check if ANY element matches
3284            if let Some(body_arr) = body_value.as_array() {
3285                let any_match = body_arr
3286                    .iter()
3287                    .any(|elem| matches_filter_policy_nested(&nested_map, elem));
3288                if !any_match {
3289                    return false;
3290                }
3291            } else if !matches_filter_policy_nested(&nested_map, body_value) {
3292                return false;
3293            }
3294        }
3295    }
3296
3297    true
3298}
3299
3300/// Untyped filter check - used for String.Array elements, $or, and body array elements
3301/// where both string and numeric comparisons are allowed.
3302fn check_filter_values(allowed: &[Value], attr_value: &str) -> bool {
3303    check_filter_values_typed(allowed, attr_value, None)
3304}
3305
3306/// Type-aware filter check. When `is_numeric` is Some(true), only Number filters match.
3307/// When Some(false), only String filters match. When None, both match (original behavior).
3308fn check_filter_values_typed(
3309    allowed: &[Value],
3310    attr_value: &str,
3311    is_numeric: Option<bool>,
3312) -> bool {
3313    allowed.iter().any(|v| match v {
3314        Value::String(s) => {
3315            // If we know the attribute is numeric, string filters don't match
3316            if is_numeric == Some(true) {
3317                false
3318            } else {
3319                s == attr_value
3320            }
3321        }
3322        Value::Number(n) => {
3323            // If we know the attribute is a string, number filters don't match
3324            if is_numeric == Some(false) {
3325                return false;
3326            }
3327            if let Ok(attr_num) = attr_value.parse::<f64>() {
3328                if let Some(filter_num) = n.as_f64() {
3329                    numbers_equal(attr_num, filter_num)
3330                } else {
3331                    false
3332                }
3333            } else {
3334                false
3335            }
3336        }
3337        Value::Bool(_) | Value::Null => false,
3338        Value::Object(obj) => {
3339            if let Some(prefix) = obj.get("prefix").and_then(|v| v.as_str()) {
3340                attr_value.starts_with(prefix)
3341            } else if let Some(suffix) = obj.get("suffix").and_then(|v| v.as_str()) {
3342                attr_value.ends_with(suffix)
3343            } else if let Some(anything_but) = obj.get("anything-but") {
3344                match anything_but {
3345                    Value::String(s) => {
3346                        // String anything-but only excludes string-type attrs
3347                        if is_numeric == Some(true) {
3348                            true
3349                        } else {
3350                            attr_value != s
3351                        }
3352                    }
3353                    Value::Number(n) => {
3354                        // Number anything-but only excludes number-type attrs
3355                        if is_numeric == Some(false) {
3356                            return true;
3357                        }
3358                        if let Ok(attr_num) = attr_value.parse::<f64>() {
3359                            if let Some(filter_num) = n.as_f64() {
3360                                (attr_num - filter_num).abs() >= f64::EPSILON
3361                            } else {
3362                                true
3363                            }
3364                        } else {
3365                            true
3366                        }
3367                    }
3368                    Value::Array(arr) => {
3369                        // anything-but with array: type must match for exclusion
3370                        !arr.iter().any(|av| match av {
3371                            Value::String(s) => {
3372                                if is_numeric == Some(true) {
3373                                    false
3374                                } else {
3375                                    s == attr_value
3376                                }
3377                            }
3378                            Value::Number(n) => {
3379                                if is_numeric == Some(false) {
3380                                    return false;
3381                                }
3382                                if let Ok(attr_num) = attr_value.parse::<f64>() {
3383                                    if let Some(filter_num) = n.as_f64() {
3384                                        numbers_equal(attr_num, filter_num)
3385                                    } else {
3386                                        false
3387                                    }
3388                                } else {
3389                                    false
3390                                }
3391                            }
3392                            _ => false,
3393                        })
3394                    }
3395                    Value::Object(inner) => {
3396                        // anything-but with prefix
3397                        if let Some(prefix) = inner.get("prefix").and_then(|v| v.as_str()) {
3398                            !attr_value.starts_with(prefix)
3399                        } else if let Some(suffix) = inner.get("suffix").and_then(|v| v.as_str()) {
3400                            !attr_value.ends_with(suffix)
3401                        } else {
3402                            false
3403                        }
3404                    }
3405                    _ => false,
3406                }
3407            } else if let Some(numeric_arr) = obj.get("numeric").and_then(|v| v.as_array()) {
3408                let attr_num: f64 = match attr_value.parse() {
3409                    Ok(n) => n,
3410                    Err(_) => return false,
3411                };
3412                matches_numeric_filter(attr_num, numeric_arr)
3413            } else if let Some(eq_ignore_case) =
3414                obj.get("equals-ignore-case").and_then(|v| v.as_str())
3415            {
3416                attr_value.eq_ignore_ascii_case(eq_ignore_case)
3417            } else {
3418                // {"exists": true/false}
3419                obj.get("exists")
3420                    .and_then(|v| v.as_bool())
3421                    .unwrap_or_default()
3422            }
3423        }
3424        _ => false,
3425    })
3426}
3427
3428/// Compare two f64 values with limited precision (5 decimal places).
3429/// AWS SNS uses limited precision for number comparisons.
3430fn numbers_equal(a: f64, b: f64) -> bool {
3431    // Compare with ~5 decimal digit precision
3432    (a - b).abs() < 1e-5
3433}
3434
3435/// Evaluate a numeric filter
3436fn matches_numeric_filter(value: f64, conditions: &[Value]) -> bool {
3437    let mut i = 0;
3438    while i + 1 < conditions.len() {
3439        let op = match conditions[i].as_str() {
3440            Some(s) => s,
3441            None => return false,
3442        };
3443        let threshold = match conditions[i + 1].as_f64() {
3444            Some(n) => n,
3445            None => return false,
3446        };
3447        let passes = match op {
3448            "=" => numbers_equal(value, threshold),
3449            ">" => value > threshold,
3450            ">=" => value >= threshold,
3451            "<" => value < threshold,
3452            "<=" => value <= threshold,
3453            _ => return false,
3454        };
3455        if !passes {
3456            return false;
3457        }
3458        i += 2;
3459    }
3460    true
3461}
3462
3463/// Validate a filter policy JSON string.
3464fn validate_filter_policy(policy_str: &str) -> Result<(), AwsServiceError> {
3465    let policy: HashMap<String, Value> = serde_json::from_str(policy_str).map_err(|_| {
3466        AwsServiceError::aws_error(
3467            StatusCode::BAD_REQUEST,
3468            "InvalidParameter",
3469            "Invalid parameter: FilterPolicy: failed to parse JSON.",
3470        )
3471    })?;
3472
3473    // Count total filter values across all keys (max 150)
3474    let mut total_values = 0;
3475    for (key, value) in &policy {
3476        // Skip special operators like $or
3477        if key.starts_with('$') {
3478            continue;
3479        }
3480        if let Some(arr) = value.as_array() {
3481            total_values += arr.len();
3482            for item in arr {
3483                validate_filter_policy_value(item)?;
3484            }
3485        }
3486    }
3487    if total_values > 150 {
3488        return Err(AwsServiceError::aws_error(
3489            StatusCode::BAD_REQUEST,
3490            "InvalidParameter",
3491            "Invalid parameter: FilterPolicy: Filter policy is too complex",
3492        ));
3493    }
3494
3495    Ok(())
3496}
3497
3498/// Known match type keys for filter policy objects.
3499const VALID_FILTER_MATCH_TYPES: &[&str] = &[
3500    "exists",
3501    "prefix",
3502    "suffix",
3503    "anything-but",
3504    "numeric",
3505    "equals-ignore-case",
3506];
3507
3508/// Validate a single filter policy value.
3509fn validate_filter_policy_value(value: &Value) -> Result<(), AwsServiceError> {
3510    match value {
3511        Value::String(_) | Value::Bool(_) | Value::Null => Ok(()),
3512        Value::Number(n) => {
3513            // Number values must be within range
3514            if let Some(f) = n.as_f64() {
3515                if f.abs() >= 1_000_000_000.0 {
3516                    return Err(AwsServiceError::aws_error(
3517                        StatusCode::INTERNAL_SERVER_ERROR,
3518                        "InternalError",
3519                        format!(
3520                            "Invalid parameter: FilterPolicy: Match value {} must be smaller than 1E9",
3521                            n
3522                        ),
3523                    ));
3524                }
3525            }
3526            Ok(())
3527        }
3528        Value::Array(_) => Err(AwsServiceError::aws_error(
3529            StatusCode::BAD_REQUEST,
3530            "InvalidParameter",
3531            "Invalid parameter: FilterPolicy: Match value must be String, number, true, false, or null",
3532        )),
3533        Value::Object(obj) => {
3534            if let Some(exists_val) = obj.get("exists") {
3535                if !exists_val.is_boolean() {
3536                    return Err(AwsServiceError::aws_error(
3537                        StatusCode::BAD_REQUEST,
3538                        "InvalidParameter",
3539                        "Invalid parameter: FilterPolicy: exists match pattern must be either true or false.",
3540                    ));
3541                }
3542            }
3543            // Validate that object keys are recognized match types
3544            for key in obj.keys() {
3545                if !VALID_FILTER_MATCH_TYPES.contains(&key.as_str()) {
3546                    return Err(AwsServiceError::aws_error(
3547                        StatusCode::BAD_REQUEST,
3548                        "InvalidParameter",
3549                        format!(
3550                            "Invalid parameter: FilterPolicy: Unrecognized match type {key}"
3551                        ),
3552                    ));
3553                }
3554            }
3555            // Validate numeric filter operands
3556            if let Some(numeric_val) = obj.get("numeric") {
3557                if let Some(arr) = numeric_val.as_array() {
3558                    validate_numeric_filter(arr)?;
3559                }
3560            }
3561            Ok(())
3562        }
3563    }
3564}
3565
3566const VALID_NUMERIC_OPS: &[&str] = &["=", "<", "<=", ">", ">="];
3567const LOWER_OPS: &[&str] = &[">", ">="];
3568const UPPER_OPS: &[&str] = &["<", "<="];
3569
3570fn validate_numeric_filter(arr: &[Value]) -> Result<(), AwsServiceError> {
3571    // Empty array
3572    if arr.is_empty() {
3573        return Err(AwsServiceError::aws_error(
3574            StatusCode::BAD_REQUEST,
3575            "InvalidParameter",
3576            "Invalid parameter: Attributes Reason: FilterPolicy: Invalid member in numeric match: ]\n at ...",
3577        ));
3578    }
3579
3580    // First element must be a string operator
3581    let first_op = match arr[0].as_str() {
3582        Some(s) => s,
3583        None => {
3584            return Err(AwsServiceError::aws_error(
3585                StatusCode::BAD_REQUEST,
3586                "InvalidParameter",
3587                format!(
3588                    "Invalid parameter: Attributes Reason: FilterPolicy: Invalid member in numeric match: {}\n at ...",
3589                    arr[0]
3590                ),
3591            ));
3592        }
3593    };
3594
3595    // Must be a recognized operator
3596    if !VALID_NUMERIC_OPS.contains(&first_op) {
3597        return Err(AwsServiceError::aws_error(
3598            StatusCode::BAD_REQUEST,
3599            "InvalidParameter",
3600            format!(
3601                "Invalid parameter: Attributes Reason: FilterPolicy: Unrecognized numeric range operator: {first_op}\n at ..."
3602            ),
3603        ));
3604    }
3605
3606    // Must have a value after the operator
3607    if arr.len() < 2 {
3608        return Err(AwsServiceError::aws_error(
3609            StatusCode::BAD_REQUEST,
3610            "InvalidParameter",
3611            format!(
3612                "Invalid parameter: Attributes Reason: FilterPolicy: Value of {first_op} must be numeric\n at ..."
3613            ),
3614        ));
3615    }
3616
3617    // Value must be numeric
3618    if !arr[1].is_number() {
3619        return Err(AwsServiceError::aws_error(
3620            StatusCode::BAD_REQUEST,
3621            "InvalidParameter",
3622            format!(
3623                "Invalid parameter: Attributes Reason: FilterPolicy: Value of {first_op} must be numeric\n at ..."
3624            ),
3625        ));
3626    }
3627
3628    // Numeric operand must be smaller than 1E9
3629    if let Some(f) = arr[1].as_f64() {
3630        if f.abs() >= 1_000_000_000.0 {
3631            return Err(AwsServiceError::aws_error(
3632                StatusCode::BAD_REQUEST,
3633                "InvalidParameter",
3634                format!(
3635                    "Invalid parameter: FilterPolicy: Numeric match value must be smaller than 1E9, got {}",
3636                    arr[1]
3637                ),
3638            ));
3639        }
3640    }
3641
3642    // Single comparison (2 elements): valid
3643    if arr.len() == 2 {
3644        return Ok(());
3645    }
3646
3647    // Range expression: must have exactly 4 elements
3648    if arr.len() > 4 {
3649        return Err(AwsServiceError::aws_error(
3650            StatusCode::BAD_REQUEST,
3651            "InvalidParameter",
3652            "Invalid parameter: Attributes Reason: FilterPolicy: Too many elements in numeric expression\n at ...",
3653        ));
3654    }
3655
3656    if arr.len() < 4 {
3657        // 3 elements: op, val, op_missing_value
3658        if let Some(op2) = arr[2].as_str() {
3659            return Err(AwsServiceError::aws_error(
3660                StatusCode::BAD_REQUEST,
3661                "InvalidParameter",
3662                format!(
3663                    "Invalid parameter: Attributes Reason: FilterPolicy: Value of {op2} must be numeric\n at ..."
3664                ),
3665            ));
3666        }
3667        return Err(AwsServiceError::aws_error(
3668            StatusCode::BAD_REQUEST,
3669            "InvalidParameter",
3670            "Invalid parameter: Attributes Reason: FilterPolicy: Too many elements in numeric expression\n at ...",
3671        ));
3672    }
3673
3674    // Exactly 4 elements: range expression
3675    let second_op = match arr[2].as_str() {
3676        Some(s) => s,
3677        None => {
3678            return Err(AwsServiceError::aws_error(
3679                StatusCode::BAD_REQUEST,
3680                "InvalidParameter",
3681                format!(
3682                    "Invalid parameter: Attributes Reason: FilterPolicy: Invalid member in numeric match: {}\n at ...",
3683                    arr[2]
3684                ),
3685            ));
3686        }
3687    };
3688
3689    if !arr[3].is_number() {
3690        return Err(AwsServiceError::aws_error(
3691            StatusCode::BAD_REQUEST,
3692            "InvalidParameter",
3693            format!(
3694                "Invalid parameter: Attributes Reason: FilterPolicy: Value of {second_op} must be numeric\n at ..."
3695            ),
3696        ));
3697    }
3698
3699    // Numeric operand must be smaller than 1E9
3700    if let Some(f) = arr[3].as_f64() {
3701        if f.abs() >= 1_000_000_000.0 {
3702            return Err(AwsServiceError::aws_error(
3703                StatusCode::BAD_REQUEST,
3704                "InvalidParameter",
3705                format!(
3706                    "Invalid parameter: FilterPolicy: Numeric match value must be smaller than 1E9, got {}",
3707                    arr[3]
3708                ),
3709            ));
3710        }
3711    }
3712
3713    // For a range, first op must be lower bound (> or >=) and second op must be upper bound (< or <=)
3714    let first_is_lower = LOWER_OPS.contains(&first_op);
3715    let second_is_upper = UPPER_OPS.contains(&second_op);
3716
3717    if first_is_lower && !second_is_upper {
3718        return Err(AwsServiceError::aws_error(
3719            StatusCode::BAD_REQUEST,
3720            "InvalidParameter",
3721            format!(
3722                "Invalid parameter: Attributes Reason: FilterPolicy: Bad numeric range operator: {second_op}\n at ..."
3723            ),
3724        ));
3725    }
3726
3727    if !first_is_lower {
3728        return Err(AwsServiceError::aws_error(
3729            StatusCode::BAD_REQUEST,
3730            "InvalidParameter",
3731            "Invalid parameter: Attributes Reason: FilterPolicy: Too many elements in numeric expression\n at ...",
3732        ));
3733    }
3734
3735    // Bottom must be less than top
3736    let bottom = arr[1].as_f64().unwrap_or(0.0);
3737    let top = arr[3].as_f64().unwrap_or(0.0);
3738    if bottom >= top {
3739        return Err(AwsServiceError::aws_error(
3740            StatusCode::BAD_REQUEST,
3741            "InvalidParameter",
3742            "Invalid parameter: Attributes Reason: FilterPolicy: Bottom must be less than top\n at ...",
3743        ));
3744    }
3745
3746    Ok(())
3747}
3748
3749#[cfg(test)]
3750mod tests {
3751    use super::*;
3752
3753    #[test]
3754    fn validate_message_structure_json_rejects_invalid_json() {
3755        let result = validate_message_structure_json("not valid json");
3756        assert!(result.is_err());
3757        let err = result.unwrap_err();
3758        let msg = format!("{err}");
3759        assert!(msg.contains("No JSON message body is parseable"), "{msg}");
3760    }
3761
3762    #[test]
3763    fn validate_message_structure_json_rejects_missing_default_key() {
3764        let result = validate_message_structure_json(r#"{"sqs": "hello"}"#);
3765        assert!(result.is_err());
3766        let err = result.unwrap_err();
3767        let msg = format!("{err}");
3768        assert!(
3769            msg.contains("No default entry in JSON message body"),
3770            "{msg}"
3771        );
3772    }
3773
3774    #[test]
3775    fn validate_message_structure_json_accepts_valid() {
3776        let result =
3777            validate_message_structure_json(r#"{"default": "hello", "sqs": "hello from sqs"}"#);
3778        assert!(result.is_ok());
3779    }
3780
3781    #[test]
3782    fn build_sns_lambda_event_uses_real_subscription_arn() {
3783        let now = Utc::now();
3784        let sub_arn = "arn:aws:sns:us-east-1:123456789012:my-topic:abc-def-123";
3785        let topic_arn = "arn:aws:sns:us-east-1:123456789012:my-topic";
3786        let attrs = serde_json::Map::new();
3787
3788        let payload = build_sns_lambda_event(&SnsLambdaEventInput {
3789            message_id: "msg-001",
3790            topic_arn,
3791            subscription_arn: sub_arn,
3792            message: "hello",
3793            subject: Some("test subject"),
3794            message_attributes: &attrs,
3795            timestamp: &now,
3796            endpoint: "http://localhost:4566",
3797        });
3798
3799        let parsed: Value = serde_json::from_str(&payload).unwrap();
3800        let record = &parsed["Records"][0];
3801        assert_eq!(record["EventSubscriptionArn"], sub_arn);
3802        assert_eq!(record["EventSource"], "aws:sns");
3803        assert_eq!(record["Sns"]["TopicArn"], topic_arn);
3804        assert_eq!(record["Sns"]["Message"], "hello");
3805        assert_eq!(record["Sns"]["Subject"], "test subject");
3806        assert_eq!(record["Sns"]["MessageId"], "msg-001");
3807        // UnsubscribeUrl should use subscription ARN, not topic ARN
3808        let unsub_url = record["Sns"]["UnsubscribeUrl"].as_str().unwrap();
3809        assert!(
3810            unsub_url.contains(sub_arn),
3811            "UnsubscribeUrl should contain subscription ARN"
3812        );
3813    }
3814
3815    #[test]
3816    fn build_sns_envelope_uses_configured_endpoint() {
3817        let endpoint = "http://myhost:5555";
3818        let topic_arn = "arn:aws:sns:us-east-1:123456789012:my-topic";
3819        let attrs = serde_json::Map::new();
3820
3821        let envelope = build_sns_envelope(
3822            "msg-002",
3823            topic_arn,
3824            &None,
3825            "test message",
3826            &attrs,
3827            endpoint,
3828        );
3829
3830        let parsed: Value = serde_json::from_str(&envelope).unwrap();
3831        let unsub_url = parsed["UnsubscribeURL"].as_str().unwrap();
3832        assert!(
3833            unsub_url.starts_with("http://myhost:5555/"),
3834            "UnsubscribeURL should use the configured endpoint, got: {unsub_url}"
3835        );
3836        assert!(
3837            unsub_url.contains(topic_arn),
3838            "UnsubscribeURL should contain topic ARN"
3839        );
3840    }
3841
3842    #[test]
3843    fn build_sns_lambda_event_uses_configured_endpoint() {
3844        let now = Utc::now();
3845        let sub_arn = "arn:aws:sns:us-east-1:123456789012:my-topic:abc-def-123";
3846        let attrs = serde_json::Map::new();
3847        let endpoint = "http://custom:9999";
3848
3849        let payload = build_sns_lambda_event(&SnsLambdaEventInput {
3850            message_id: "msg-003",
3851            topic_arn: "arn:aws:sns:us-east-1:123456789012:my-topic",
3852            subscription_arn: sub_arn,
3853            message: "hello",
3854            subject: None,
3855            message_attributes: &attrs,
3856            timestamp: &now,
3857            endpoint,
3858        });
3859
3860        let parsed: Value = serde_json::from_str(&payload).unwrap();
3861        let unsub_url = parsed["Records"][0]["Sns"]["UnsubscribeUrl"]
3862            .as_str()
3863            .unwrap();
3864        assert!(
3865            unsub_url.starts_with("http://custom:9999/"),
3866            "UnsubscribeUrl should use configured endpoint, got: {unsub_url}"
3867        );
3868    }
3869
3870    #[test]
3871    fn add_permission_with_invalid_policy_returns_error_not_panic() {
3872        use fakecloud_core::delivery::DeliveryBus;
3873        use parking_lot::RwLock;
3874        use std::sync::Arc;
3875
3876        let state = Arc::new(RwLock::new(crate::state::SnsState::new(
3877            "123456789012",
3878            "us-east-1",
3879            "http://localhost:4566",
3880        )));
3881        let delivery = Arc::new(DeliveryBus::new());
3882        let svc = SnsService::new(state.clone(), delivery);
3883
3884        // Create a topic first
3885        let topic_arn = "arn:aws:sns:us-east-1:123456789012:test-topic";
3886        {
3887            let mut s = state.write();
3888            s.topics.insert(
3889                topic_arn.to_string(),
3890                crate::state::SnsTopic {
3891                    topic_arn: topic_arn.to_string(),
3892                    name: "test-topic".to_string(),
3893                    attributes: {
3894                        let mut m = std::collections::HashMap::new();
3895                        // Set an intentionally broken JSON policy
3896                        m.insert("Policy".to_string(), "not valid json {{{".to_string());
3897                        m
3898                    },
3899                    is_fifo: false,
3900                    tags: vec![],
3901                    created_at: Utc::now(),
3902                },
3903            );
3904        }
3905
3906        // Build an AddPermission request
3907        let body = format!(
3908            "Action=AddPermission&TopicArn={}&Label=TestLabel&ActionName.member.1=Publish&AWSAccountId.member.1=111111111111",
3909            topic_arn
3910        );
3911        let req = fakecloud_core::service::AwsRequest {
3912            service: "sns".to_string(),
3913            action: "AddPermission".to_string(),
3914            region: "us-east-1".to_string(),
3915            account_id: "123456789012".to_string(),
3916            request_id: "test-req".to_string(),
3917            headers: http::HeaderMap::new(),
3918            query_params: std::collections::HashMap::new(),
3919            body: body.into(),
3920            path_segments: vec![],
3921            raw_path: "/".to_string(),
3922            raw_query: String::new(),
3923            method: http::Method::POST,
3924            is_query_protocol: true,
3925            access_key_id: None,
3926            principal: None,
3927        };
3928
3929        // This should return an error, not panic
3930        let result = svc.add_permission(&req);
3931        assert!(
3932            result.is_err(),
3933            "Invalid policy JSON should return error, not panic"
3934        );
3935    }
3936
3937    // --- Helper to build SNS service + state for integration-style unit tests ---
3938
3939    fn make_sns() -> (SnsService, crate::state::SharedSnsState) {
3940        use fakecloud_core::delivery::DeliveryBus;
3941        use parking_lot::RwLock;
3942        use std::sync::Arc;
3943
3944        let state = Arc::new(RwLock::new(crate::state::SnsState::new(
3945            "123456789012",
3946            "us-east-1",
3947            "http://localhost:4566",
3948        )));
3949        let delivery = Arc::new(DeliveryBus::new());
3950        let svc = SnsService::new(state.clone(), delivery);
3951        (svc, state)
3952    }
3953
3954    fn sns_request(action: &str, params: Vec<(&str, &str)>) -> fakecloud_core::service::AwsRequest {
3955        let mut query_params = std::collections::HashMap::new();
3956        query_params.insert("Action".to_string(), action.to_string());
3957        for (k, v) in params {
3958            query_params.insert(k.to_string(), v.to_string());
3959        }
3960        fakecloud_core::service::AwsRequest {
3961            service: "sns".to_string(),
3962            action: action.to_string(),
3963            region: "us-east-1".to_string(),
3964            account_id: "123456789012".to_string(),
3965            request_id: "test-req".to_string(),
3966            headers: http::HeaderMap::new(),
3967            query_params,
3968            body: Default::default(),
3969            path_segments: vec![],
3970            raw_path: "/".to_string(),
3971            raw_query: String::new(),
3972            method: http::Method::POST,
3973            is_query_protocol: true,
3974            access_key_id: None,
3975            principal: None,
3976        }
3977    }
3978
3979    fn assert_ok(result: &Result<AwsResponse, AwsServiceError>) {
3980        assert!(
3981            result.is_ok(),
3982            "Expected Ok, got: {:?}",
3983            result.as_ref().err()
3984        );
3985    }
3986
3987    fn response_body(result: &Result<AwsResponse, AwsServiceError>) -> String {
3988        String::from_utf8(result.as_ref().unwrap().body.expect_bytes().to_vec()).unwrap()
3989    }
3990
3991    // --- Subscribe / Unsubscribe / ListSubscriptions / ListSubscriptionsByTopic ---
3992
3993    #[test]
3994    fn subscribe_creates_subscription() {
3995        let (svc, _state) = make_sns();
3996        // Create topic first
3997        let req = sns_request("CreateTopic", vec![("Name", "my-topic")]);
3998        assert_ok(&svc.create_topic(&req));
3999
4000        let topic_arn = "arn:aws:sns:us-east-1:123456789012:my-topic";
4001        let req = sns_request(
4002            "Subscribe",
4003            vec![
4004                ("TopicArn", topic_arn),
4005                ("Protocol", "email"),
4006                ("Endpoint", "user@example.com"),
4007            ],
4008        );
4009        let result = svc.subscribe(&req);
4010        assert_ok(&result);
4011        let body = response_body(&result);
4012        assert!(
4013            body.contains("<SubscriptionArn>"),
4014            "Response should contain SubscriptionArn"
4015        );
4016        assert!(
4017            body.contains(topic_arn),
4018            "SubscriptionArn should contain topic ARN"
4019        );
4020    }
4021
4022    #[test]
4023    fn subscribe_duplicate_returns_existing_arn() {
4024        let (svc, _state) = make_sns();
4025        let req = sns_request("CreateTopic", vec![("Name", "dup-topic")]);
4026        assert_ok(&svc.create_topic(&req));
4027
4028        let topic_arn = "arn:aws:sns:us-east-1:123456789012:dup-topic";
4029        let params = vec![
4030            ("TopicArn", topic_arn),
4031            ("Protocol", "email"),
4032            ("Endpoint", "user@example.com"),
4033        ];
4034        let r1 = svc.subscribe(&sns_request("Subscribe", params.clone()));
4035        assert_ok(&r1);
4036        let body1 = response_body(&r1);
4037
4038        let r2 = svc.subscribe(&sns_request("Subscribe", params));
4039        assert_ok(&r2);
4040        let body2 = response_body(&r2);
4041
4042        // Both should return same subscription ARN
4043        assert_eq!(body1, body2, "Duplicate subscribe should return same ARN");
4044    }
4045
4046    #[test]
4047    fn unsubscribe_removes_subscription() {
4048        let (svc, state) = make_sns();
4049        let req = sns_request("CreateTopic", vec![("Name", "unsub-topic")]);
4050        assert_ok(&svc.create_topic(&req));
4051
4052        let topic_arn = "arn:aws:sns:us-east-1:123456789012:unsub-topic";
4053        let req = sns_request(
4054            "Subscribe",
4055            vec![
4056                ("TopicArn", topic_arn),
4057                ("Protocol", "email"),
4058                ("Endpoint", "user@example.com"),
4059            ],
4060        );
4061        assert_ok(&svc.subscribe(&req));
4062
4063        // Get subscription ARN from state
4064        let sub_arn = {
4065            let s = state.read();
4066            s.subscriptions.keys().next().unwrap().clone()
4067        };
4068
4069        let req = sns_request("Unsubscribe", vec![("SubscriptionArn", &sub_arn)]);
4070        assert_ok(&svc.unsubscribe(&req));
4071
4072        let s = state.read();
4073        assert!(s.subscriptions.is_empty(), "Subscription should be removed");
4074    }
4075
4076    #[test]
4077    fn list_subscriptions_returns_all() {
4078        let (svc, _state) = make_sns();
4079        let req = sns_request("CreateTopic", vec![("Name", "list-topic")]);
4080        assert_ok(&svc.create_topic(&req));
4081
4082        let topic_arn = "arn:aws:sns:us-east-1:123456789012:list-topic";
4083        for i in 0..3 {
4084            let email = format!("user{}@example.com", i);
4085            let req = sns_request(
4086                "Subscribe",
4087                vec![
4088                    ("TopicArn", topic_arn),
4089                    ("Protocol", "email"),
4090                    ("Endpoint", &email),
4091                ],
4092            );
4093            assert_ok(&svc.subscribe(&req));
4094        }
4095
4096        let req = sns_request("ListSubscriptions", vec![]);
4097        let result = svc.list_subscriptions(&req);
4098        assert_ok(&result);
4099        let body = response_body(&result);
4100        // Should contain all 3 subscriptions
4101        let count = body.matches("<member>").count();
4102        assert_eq!(count, 3, "Should list 3 subscriptions, found {}", count);
4103    }
4104
4105    #[test]
4106    fn list_subscriptions_by_topic_filters_correctly() {
4107        let (svc, _state) = make_sns();
4108        // Create two topics
4109        assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "topicA")])));
4110        assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "topicB")])));
4111
4112        let arn_a = "arn:aws:sns:us-east-1:123456789012:topicA";
4113        let arn_b = "arn:aws:sns:us-east-1:123456789012:topicB";
4114
4115        // Subscribe 2 to A, 1 to B
4116        assert_ok(&svc.subscribe(&sns_request(
4117            "Subscribe",
4118            vec![
4119                ("TopicArn", arn_a),
4120                ("Protocol", "email"),
4121                ("Endpoint", "a1@example.com"),
4122            ],
4123        )));
4124        assert_ok(&svc.subscribe(&sns_request(
4125            "Subscribe",
4126            vec![
4127                ("TopicArn", arn_a),
4128                ("Protocol", "email"),
4129                ("Endpoint", "a2@example.com"),
4130            ],
4131        )));
4132        assert_ok(&svc.subscribe(&sns_request(
4133            "Subscribe",
4134            vec![
4135                ("TopicArn", arn_b),
4136                ("Protocol", "email"),
4137                ("Endpoint", "b1@example.com"),
4138            ],
4139        )));
4140
4141        let req = sns_request("ListSubscriptionsByTopic", vec![("TopicArn", arn_a)]);
4142        let result = svc.list_subscriptions_by_topic(&req);
4143        assert_ok(&result);
4144        let body = response_body(&result);
4145        let count = body.matches("<member>").count();
4146        assert_eq!(
4147            count, 2,
4148            "Topic A should have 2 subscriptions, found {}",
4149            count
4150        );
4151    }
4152
4153    // --- Publish / PublishBatch ---
4154
4155    #[test]
4156    fn publish_to_topic_stores_message() {
4157        let (svc, state) = make_sns();
4158        assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "pub-topic")])));
4159
4160        let topic_arn = "arn:aws:sns:us-east-1:123456789012:pub-topic";
4161        let req = sns_request(
4162            "Publish",
4163            vec![
4164                ("TopicArn", topic_arn),
4165                ("Message", "Hello world"),
4166                ("Subject", "Test subject"),
4167            ],
4168        );
4169        let result = svc.publish(&req);
4170        assert_ok(&result);
4171        let body = response_body(&result);
4172        assert!(
4173            body.contains("<MessageId>"),
4174            "Response should contain MessageId"
4175        );
4176
4177        let s = state.read();
4178        assert_eq!(s.published.len(), 1);
4179        assert_eq!(s.published[0].message, "Hello world");
4180        assert_eq!(s.published[0].subject.as_deref(), Some("Test subject"));
4181    }
4182
4183    #[test]
4184    fn publish_to_nonexistent_topic_returns_error() {
4185        let (svc, _state) = make_sns();
4186        let req = sns_request(
4187            "Publish",
4188            vec![
4189                ("TopicArn", "arn:aws:sns:us-east-1:123456789012:nope"),
4190                ("Message", "Hello"),
4191            ],
4192        );
4193        let result = svc.publish(&req);
4194        assert!(result.is_err(), "Publish to nonexistent topic should error");
4195    }
4196
4197    #[test]
4198    fn publish_without_topic_or_phone_returns_error() {
4199        let (svc, _state) = make_sns();
4200        let req = sns_request("Publish", vec![("Message", "Hello")]);
4201        let result = svc.publish(&req);
4202        assert!(result.is_err(), "Publish without target should error");
4203    }
4204
4205    #[test]
4206    fn publish_validates_subject_length() {
4207        let (svc, _state) = make_sns();
4208        assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "subj-topic")])));
4209
4210        let topic_arn = "arn:aws:sns:us-east-1:123456789012:subj-topic";
4211        let long_subject = "x".repeat(101);
4212        let req = sns_request(
4213            "Publish",
4214            vec![
4215                ("TopicArn", topic_arn),
4216                ("Message", "Hello"),
4217                ("Subject", &long_subject),
4218            ],
4219        );
4220        let result = svc.publish(&req);
4221        assert!(result.is_err(), "Subject > 100 chars should error");
4222    }
4223
4224    #[test]
4225    fn publish_to_sms_phone_number() {
4226        let (svc, state) = make_sns();
4227        let req = sns_request(
4228            "Publish",
4229            vec![("PhoneNumber", "+15551234567"), ("Message", "SMS test")],
4230        );
4231        let result = svc.publish(&req);
4232        assert_ok(&result);
4233
4234        let s = state.read();
4235        assert_eq!(s.sms_messages.len(), 1);
4236        assert_eq!(s.sms_messages[0].0, "+15551234567");
4237        assert_eq!(s.sms_messages[0].1, "SMS test");
4238    }
4239
4240    #[test]
4241    fn publish_to_invalid_phone_returns_error() {
4242        let (svc, _state) = make_sns();
4243        let req = sns_request(
4244            "Publish",
4245            vec![("PhoneNumber", "not-a-phone"), ("Message", "SMS test")],
4246        );
4247        let result = svc.publish(&req);
4248        assert!(result.is_err(), "Invalid phone should error");
4249    }
4250
4251    #[test]
4252    fn publish_batch_stores_multiple_messages() {
4253        let (svc, state) = make_sns();
4254        assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "batch-topic")])));
4255
4256        let topic_arn = "arn:aws:sns:us-east-1:123456789012:batch-topic";
4257        let req = sns_request(
4258            "PublishBatch",
4259            vec![
4260                ("TopicArn", topic_arn),
4261                ("PublishBatchRequestEntries.member.1.Id", "msg1"),
4262                ("PublishBatchRequestEntries.member.1.Message", "Hello 1"),
4263                ("PublishBatchRequestEntries.member.2.Id", "msg2"),
4264                ("PublishBatchRequestEntries.member.2.Message", "Hello 2"),
4265            ],
4266        );
4267        let result = svc.publish_batch(&req);
4268        assert_ok(&result);
4269        let body = response_body(&result);
4270        assert!(
4271            body.contains("<Successful>"),
4272            "Response should contain Successful element"
4273        );
4274
4275        let s = state.read();
4276        assert_eq!(s.published.len(), 2);
4277    }
4278
4279    #[test]
4280    fn publish_batch_rejects_duplicate_ids() {
4281        let (svc, _state) = make_sns();
4282        assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "batch-dup")])));
4283
4284        let topic_arn = "arn:aws:sns:us-east-1:123456789012:batch-dup";
4285        let req = sns_request(
4286            "PublishBatch",
4287            vec![
4288                ("TopicArn", topic_arn),
4289                ("PublishBatchRequestEntries.member.1.Id", "same"),
4290                ("PublishBatchRequestEntries.member.1.Message", "Hello 1"),
4291                ("PublishBatchRequestEntries.member.2.Id", "same"),
4292                ("PublishBatchRequestEntries.member.2.Message", "Hello 2"),
4293            ],
4294        );
4295        let result = svc.publish_batch(&req);
4296        assert!(result.is_err(), "Duplicate batch IDs should error");
4297    }
4298
4299    // --- SetSubscriptionAttributes / GetSubscriptionAttributes ---
4300
4301    #[test]
4302    fn get_subscription_attributes_returns_defaults() {
4303        let (svc, state) = make_sns();
4304        assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "attr-topic")])));
4305
4306        let topic_arn = "arn:aws:sns:us-east-1:123456789012:attr-topic";
4307        assert_ok(&svc.subscribe(&sns_request(
4308            "Subscribe",
4309            vec![
4310                ("TopicArn", topic_arn),
4311                ("Protocol", "email"),
4312                ("Endpoint", "u@example.com"),
4313            ],
4314        )));
4315
4316        let sub_arn = {
4317            let s = state.read();
4318            s.subscriptions.keys().next().unwrap().clone()
4319        };
4320
4321        let req = sns_request(
4322            "GetSubscriptionAttributes",
4323            vec![("SubscriptionArn", &sub_arn)],
4324        );
4325        let result = svc.get_subscription_attributes(&req);
4326        assert_ok(&result);
4327        let body = response_body(&result);
4328        assert!(
4329            body.contains("<key>Protocol</key>"),
4330            "Should contain Protocol attribute"
4331        );
4332        assert!(
4333            body.contains("<value>email</value>"),
4334            "Protocol should be email"
4335        );
4336        assert!(
4337            body.contains("<key>Endpoint</key>"),
4338            "Should contain Endpoint attribute"
4339        );
4340        assert!(
4341            body.contains("<key>RawMessageDelivery</key>"),
4342            "Should contain RawMessageDelivery"
4343        );
4344    }
4345
4346    #[test]
4347    fn set_subscription_attributes_updates_value() {
4348        let (svc, state) = make_sns();
4349        assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "setattr-topic")])));
4350
4351        let topic_arn = "arn:aws:sns:us-east-1:123456789012:setattr-topic";
4352        assert_ok(&svc.subscribe(&sns_request(
4353            "Subscribe",
4354            vec![
4355                ("TopicArn", topic_arn),
4356                ("Protocol", "email"),
4357                ("Endpoint", "u@example.com"),
4358            ],
4359        )));
4360
4361        let sub_arn = {
4362            let s = state.read();
4363            s.subscriptions.keys().next().unwrap().clone()
4364        };
4365
4366        // Set RawMessageDelivery to true
4367        let req = sns_request(
4368            "SetSubscriptionAttributes",
4369            vec![
4370                ("SubscriptionArn", &sub_arn),
4371                ("AttributeName", "RawMessageDelivery"),
4372                ("AttributeValue", "true"),
4373            ],
4374        );
4375        assert_ok(&svc.set_subscription_attributes(&req));
4376
4377        // Verify in state
4378        let s = state.read();
4379        let sub = s.subscriptions.get(&sub_arn).unwrap();
4380        assert_eq!(sub.attributes.get("RawMessageDelivery").unwrap(), "true");
4381    }
4382
4383    #[test]
4384    fn set_subscription_attributes_rejects_invalid_attr() {
4385        let (svc, state) = make_sns();
4386        assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "inv-attr")])));
4387
4388        let topic_arn = "arn:aws:sns:us-east-1:123456789012:inv-attr";
4389        assert_ok(&svc.subscribe(&sns_request(
4390            "Subscribe",
4391            vec![
4392                ("TopicArn", topic_arn),
4393                ("Protocol", "email"),
4394                ("Endpoint", "u@example.com"),
4395            ],
4396        )));
4397
4398        let sub_arn = {
4399            let s = state.read();
4400            s.subscriptions.keys().next().unwrap().clone()
4401        };
4402
4403        let req = sns_request(
4404            "SetSubscriptionAttributes",
4405            vec![
4406                ("SubscriptionArn", &sub_arn),
4407                ("AttributeName", "BogusAttribute"),
4408                ("AttributeValue", "whatever"),
4409            ],
4410        );
4411        let result = svc.set_subscription_attributes(&req);
4412        assert!(result.is_err(), "Invalid attribute name should error");
4413    }
4414
4415    #[test]
4416    fn get_subscription_attributes_nonexistent_returns_error() {
4417        let (svc, _state) = make_sns();
4418        let req = sns_request(
4419            "GetSubscriptionAttributes",
4420            vec![(
4421                "SubscriptionArn",
4422                "arn:aws:sns:us-east-1:123456789012:nope:fake",
4423            )],
4424        );
4425        let result = svc.get_subscription_attributes(&req);
4426        assert!(result.is_err(), "Nonexistent subscription should error");
4427    }
4428
4429    // --- TagResource / UntagResource / ListTagsForResource ---
4430
4431    #[test]
4432    fn tag_untag_list_tags_lifecycle() {
4433        let (svc, _state) = make_sns();
4434        assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "tag-topic")])));
4435
4436        let topic_arn = "arn:aws:sns:us-east-1:123456789012:tag-topic";
4437
4438        // Tag the resource
4439        let req = sns_request(
4440            "TagResource",
4441            vec![
4442                ("ResourceArn", topic_arn),
4443                ("Tags.member.1.Key", "env"),
4444                ("Tags.member.1.Value", "prod"),
4445                ("Tags.member.2.Key", "team"),
4446                ("Tags.member.2.Value", "platform"),
4447            ],
4448        );
4449        assert_ok(&svc.tag_resource(&req));
4450
4451        // List tags
4452        let req = sns_request("ListTagsForResource", vec![("ResourceArn", topic_arn)]);
4453        let result = svc.list_tags_for_resource(&req);
4454        assert_ok(&result);
4455        let body = response_body(&result);
4456        assert!(
4457            body.contains("<Key>env</Key>"),
4458            "Should contain env tag key"
4459        );
4460        assert!(
4461            body.contains("<Value>prod</Value>"),
4462            "Should contain prod tag value"
4463        );
4464        assert!(
4465            body.contains("<Key>team</Key>"),
4466            "Should contain team tag key"
4467        );
4468
4469        // Untag one key
4470        let req = sns_request(
4471            "UntagResource",
4472            vec![("ResourceArn", topic_arn), ("TagKeys.member.1", "env")],
4473        );
4474        assert_ok(&svc.untag_resource(&req));
4475
4476        // Verify only team remains
4477        let req = sns_request("ListTagsForResource", vec![("ResourceArn", topic_arn)]);
4478        let result = svc.list_tags_for_resource(&req);
4479        assert_ok(&result);
4480        let body = response_body(&result);
4481        assert!(
4482            !body.contains("<Key>env</Key>"),
4483            "env tag should be removed"
4484        );
4485        assert!(body.contains("<Key>team</Key>"), "team tag should remain");
4486    }
4487
4488    #[test]
4489    fn tag_resource_nonexistent_returns_error() {
4490        let (svc, _state) = make_sns();
4491        let req = sns_request(
4492            "TagResource",
4493            vec![
4494                ("ResourceArn", "arn:aws:sns:us-east-1:123456789012:nope"),
4495                ("Tags.member.1.Key", "k"),
4496                ("Tags.member.1.Value", "v"),
4497            ],
4498        );
4499        let result = svc.tag_resource(&req);
4500        assert!(result.is_err(), "Tagging nonexistent resource should error");
4501    }
4502
4503    #[test]
4504    fn tag_resource_overwrites_existing_key() {
4505        let (svc, _state) = make_sns();
4506        assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "tag-overwrite")])));
4507
4508        let topic_arn = "arn:aws:sns:us-east-1:123456789012:tag-overwrite";
4509
4510        // Add tag
4511        let req = sns_request(
4512            "TagResource",
4513            vec![
4514                ("ResourceArn", topic_arn),
4515                ("Tags.member.1.Key", "env"),
4516                ("Tags.member.1.Value", "dev"),
4517            ],
4518        );
4519        assert_ok(&svc.tag_resource(&req));
4520
4521        // Overwrite tag
4522        let req = sns_request(
4523            "TagResource",
4524            vec![
4525                ("ResourceArn", topic_arn),
4526                ("Tags.member.1.Key", "env"),
4527                ("Tags.member.1.Value", "prod"),
4528            ],
4529        );
4530        assert_ok(&svc.tag_resource(&req));
4531
4532        // Verify overwritten
4533        let req = sns_request("ListTagsForResource", vec![("ResourceArn", topic_arn)]);
4534        let body = response_body(&svc.list_tags_for_resource(&req));
4535        assert!(
4536            body.contains("<Value>prod</Value>"),
4537            "Tag value should be overwritten to prod"
4538        );
4539        // Should only have 1 member
4540        assert_eq!(
4541            body.matches("<member>").count(),
4542            1,
4543            "Should have exactly 1 tag"
4544        );
4545    }
4546
4547    // --- SetTopicAttributes / GetTopicAttributes ---
4548
4549    #[test]
4550    fn set_and_get_topic_attributes() {
4551        let (svc, _state) = make_sns();
4552        assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "attr-topic2")])));
4553
4554        let topic_arn = "arn:aws:sns:us-east-1:123456789012:attr-topic2";
4555
4556        // Set DisplayName
4557        let req = sns_request(
4558            "SetTopicAttributes",
4559            vec![
4560                ("TopicArn", topic_arn),
4561                ("AttributeName", "DisplayName"),
4562                ("AttributeValue", "My Nice Topic"),
4563            ],
4564        );
4565        assert_ok(&svc.set_topic_attributes(&req));
4566
4567        // Get attributes
4568        let req = sns_request("GetTopicAttributes", vec![("TopicArn", topic_arn)]);
4569        let result = svc.get_topic_attributes(&req);
4570        assert_ok(&result);
4571        let body = response_body(&result);
4572        assert!(
4573            body.contains("<value>My Nice Topic</value>"),
4574            "DisplayName should be set"
4575        );
4576        assert!(
4577            body.contains("<key>TopicArn</key>"),
4578            "Should contain TopicArn"
4579        );
4580        assert!(body.contains("<key>Owner</key>"), "Should contain Owner");
4581    }
4582
4583    #[test]
4584    fn get_topic_attributes_nonexistent_returns_error() {
4585        let (svc, _state) = make_sns();
4586        let req = sns_request(
4587            "GetTopicAttributes",
4588            vec![("TopicArn", "arn:aws:sns:us-east-1:123456789012:nope")],
4589        );
4590        let result = svc.get_topic_attributes(&req);
4591        assert!(result.is_err(), "Nonexistent topic should error");
4592    }
4593
4594    #[test]
4595    fn get_topic_attributes_wrong_region_returns_error() {
4596        let (svc, _state) = make_sns();
4597        assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "region-topic")])));
4598
4599        // The topic was created in us-east-1, but try to get it with a different region in the ARN
4600        let req = sns_request(
4601            "GetTopicAttributes",
4602            vec![(
4603                "TopicArn",
4604                "arn:aws:sns:eu-west-1:123456789012:region-topic",
4605            )],
4606        );
4607        let result = svc.get_topic_attributes(&req);
4608        assert!(result.is_err(), "Topic in wrong region should error");
4609    }
4610
4611    // --- ConfirmSubscription ---
4612
4613    #[test]
4614    fn confirm_subscription_returns_arn() {
4615        let (svc, state) = make_sns();
4616        assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "confirm-topic")])));
4617
4618        let topic_arn = "arn:aws:sns:us-east-1:123456789012:confirm-topic";
4619
4620        // Subscribe an HTTP endpoint (starts as pending)
4621        assert_ok(&svc.subscribe(&sns_request(
4622            "Subscribe",
4623            vec![
4624                ("TopicArn", topic_arn),
4625                ("Protocol", "http"),
4626                ("Endpoint", "http://example.com/hook"),
4627            ],
4628        )));
4629
4630        // Get the token from the pending subscription
4631        let token = {
4632            let s = state.read();
4633            s.subscriptions
4634                .values()
4635                .find(|sub| sub.topic_arn == topic_arn && !sub.confirmed)
4636                .expect("should have a pending subscription")
4637                .confirmation_token
4638                .clone()
4639                .expect("pending subscription should have a token")
4640        };
4641
4642        let req = sns_request(
4643            "ConfirmSubscription",
4644            vec![("TopicArn", topic_arn), ("Token", &token)],
4645        );
4646        let result = svc.confirm_subscription(&req);
4647        assert_ok(&result);
4648        let body = response_body(&result);
4649        assert!(
4650            body.contains("<SubscriptionArn>"),
4651            "Should return a SubscriptionArn"
4652        );
4653
4654        // Verify the subscription is now confirmed
4655        let s = state.read();
4656        let sub = s
4657            .subscriptions
4658            .values()
4659            .find(|sub| sub.topic_arn == topic_arn)
4660            .expect("subscription should exist");
4661        assert!(sub.confirmed, "subscription should be confirmed");
4662    }
4663
4664    #[test]
4665    fn confirm_subscription_rejects_invalid_token() {
4666        let (svc, _state) = make_sns();
4667        assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "confirm-topic")])));
4668
4669        let topic_arn = "arn:aws:sns:us-east-1:123456789012:confirm-topic";
4670
4671        // Subscribe an HTTP endpoint (starts as pending)
4672        assert_ok(&svc.subscribe(&sns_request(
4673            "Subscribe",
4674            vec![
4675                ("TopicArn", topic_arn),
4676                ("Protocol", "http"),
4677                ("Endpoint", "http://example.com/hook"),
4678            ],
4679        )));
4680
4681        // Try to confirm with wrong token
4682        let req = sns_request(
4683            "ConfirmSubscription",
4684            vec![("TopicArn", topic_arn), ("Token", "wrong-token")],
4685        );
4686        let result = svc.confirm_subscription(&req);
4687        assert!(result.is_err(), "Should reject invalid token");
4688    }
4689
4690    #[test]
4691    fn confirm_subscription_matches_correct_pending_sub() {
4692        let (svc, state) = make_sns();
4693        assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "multi-topic")])));
4694
4695        let topic_arn = "arn:aws:sns:us-east-1:123456789012:multi-topic";
4696
4697        // Subscribe two HTTP endpoints (both start as pending)
4698        assert_ok(&svc.subscribe(&sns_request(
4699            "Subscribe",
4700            vec![
4701                ("TopicArn", topic_arn),
4702                ("Protocol", "http"),
4703                ("Endpoint", "http://first.example.com/hook"),
4704            ],
4705        )));
4706        assert_ok(&svc.subscribe(&sns_request(
4707            "Subscribe",
4708            vec![
4709                ("TopicArn", topic_arn),
4710                ("Protocol", "http"),
4711                ("Endpoint", "http://second.example.com/hook"),
4712            ],
4713        )));
4714
4715        // Get the token for the second subscription
4716        let (second_arn, second_token) = {
4717            let s = state.read();
4718            let sub = s
4719                .subscriptions
4720                .values()
4721                .find(|sub| sub.endpoint == "http://second.example.com/hook")
4722                .expect("should have second subscription");
4723            (
4724                sub.subscription_arn.clone(),
4725                sub.confirmation_token.clone().unwrap(),
4726            )
4727        };
4728
4729        // Confirm using the second subscription's token
4730        let req = sns_request(
4731            "ConfirmSubscription",
4732            vec![("TopicArn", topic_arn), ("Token", &second_token)],
4733        );
4734        let result = svc.confirm_subscription(&req);
4735        assert_ok(&result);
4736        let body = response_body(&result);
4737        assert!(
4738            body.contains(&second_arn),
4739            "Should return the second subscription's ARN"
4740        );
4741
4742        // Verify only the second subscription is confirmed
4743        let s = state.read();
4744        for sub in s.subscriptions.values() {
4745            if sub.endpoint == "http://second.example.com/hook" {
4746                assert!(sub.confirmed, "second subscription should be confirmed");
4747            } else {
4748                assert!(!sub.confirmed, "first subscription should still be pending");
4749            }
4750        }
4751    }
4752
4753    #[test]
4754    fn confirm_subscription_accepts_sub_arn_as_token() {
4755        let (svc, state) = make_sns();
4756        assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "arn-token")])));
4757
4758        let topic_arn = "arn:aws:sns:us-east-1:123456789012:arn-token";
4759
4760        // Subscribe an HTTP endpoint (starts as pending)
4761        assert_ok(&svc.subscribe(&sns_request(
4762            "Subscribe",
4763            vec![
4764                ("TopicArn", topic_arn),
4765                ("Protocol", "http"),
4766                ("Endpoint", "http://example.com/hook"),
4767            ],
4768        )));
4769
4770        // Get the subscription ARN
4771        let sub_arn = {
4772            let s = state.read();
4773            s.subscriptions
4774                .values()
4775                .find(|sub| sub.topic_arn == topic_arn)
4776                .expect("should have a subscription")
4777                .subscription_arn
4778                .clone()
4779        };
4780
4781        // Confirm using the subscription ARN as the token (AWS-compatible behavior)
4782        let req = sns_request(
4783            "ConfirmSubscription",
4784            vec![("TopicArn", topic_arn), ("Token", &sub_arn)],
4785        );
4786        let result = svc.confirm_subscription(&req);
4787        assert_ok(&result);
4788
4789        // Verify the subscription is now confirmed
4790        let s = state.read();
4791        let sub = s
4792            .subscriptions
4793            .values()
4794            .find(|sub| sub.topic_arn == topic_arn)
4795            .expect("subscription should exist");
4796        assert!(sub.confirmed, "subscription should be confirmed");
4797    }
4798
4799    // --- CreateTopic / DeleteTopic / ListTopics ---
4800
4801    #[test]
4802    fn create_delete_list_topics_lifecycle() {
4803        let (svc, _state) = make_sns();
4804        // Create two topics
4805        assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "topic1")])));
4806        assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "topic2")])));
4807
4808        // List
4809        let req = sns_request("ListTopics", vec![]);
4810        let body = response_body(&svc.list_topics(&req));
4811        assert_eq!(body.matches("<TopicArn>").count(), 2);
4812
4813        // Delete one
4814        let req = sns_request(
4815            "DeleteTopic",
4816            vec![("TopicArn", "arn:aws:sns:us-east-1:123456789012:topic1")],
4817        );
4818        assert_ok(&svc.delete_topic(&req));
4819
4820        // List again
4821        let req = sns_request("ListTopics", vec![]);
4822        let body = response_body(&svc.list_topics(&req));
4823        assert_eq!(body.matches("<TopicArn>").count(), 1);
4824        assert!(body.contains("topic2"), "topic2 should remain");
4825    }
4826
4827    #[test]
4828    fn create_topic_idempotent() {
4829        let (svc, _state) = make_sns();
4830        let r1 = svc.create_topic(&sns_request("CreateTopic", vec![("Name", "idem-topic")]));
4831        assert_ok(&r1);
4832        let r2 = svc.create_topic(&sns_request("CreateTopic", vec![("Name", "idem-topic")]));
4833        assert_ok(&r2);
4834        let body1 = response_body(&r1);
4835        let body2 = response_body(&r2);
4836        assert_eq!(
4837            body1, body2,
4838            "Creating same topic twice should be idempotent"
4839        );
4840    }
4841
4842    // --- AddPermission / RemovePermission ---
4843
4844    #[test]
4845    fn add_and_remove_permission() {
4846        let (svc, state) = make_sns();
4847        assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "perm-topic")])));
4848
4849        let topic_arn = "arn:aws:sns:us-east-1:123456789012:perm-topic";
4850        let req = sns_request(
4851            "AddPermission",
4852            vec![
4853                ("TopicArn", topic_arn),
4854                ("Label", "MyPermission"),
4855                ("AWSAccountId.member.1", "111111111111"),
4856                ("ActionName.member.1", "Publish"),
4857            ],
4858        );
4859        assert_ok(&svc.add_permission(&req));
4860
4861        // Verify policy has the new statement
4862        {
4863            let s = state.read();
4864            let policy_str = s
4865                .topics
4866                .get(topic_arn)
4867                .unwrap()
4868                .attributes
4869                .get("Policy")
4870                .unwrap();
4871            let policy: Value = serde_json::from_str(policy_str).unwrap();
4872            let stmts = policy["Statement"].as_array().unwrap();
4873            assert!(
4874                stmts
4875                    .iter()
4876                    .any(|s| s["Sid"].as_str() == Some("MyPermission")),
4877                "Policy should contain MyPermission statement"
4878            );
4879        }
4880
4881        // Remove permission
4882        let req = sns_request(
4883            "RemovePermission",
4884            vec![("TopicArn", topic_arn), ("Label", "MyPermission")],
4885        );
4886        assert_ok(&svc.remove_permission(&req));
4887
4888        // Verify removed
4889        {
4890            let s = state.read();
4891            let policy_str = s
4892                .topics
4893                .get(topic_arn)
4894                .unwrap()
4895                .attributes
4896                .get("Policy")
4897                .unwrap();
4898            let policy: Value = serde_json::from_str(policy_str).unwrap();
4899            let stmts = policy["Statement"].as_array().unwrap();
4900            assert!(
4901                !stmts
4902                    .iter()
4903                    .any(|s| s["Sid"].as_str() == Some("MyPermission")),
4904                "MyPermission should be removed"
4905            );
4906        }
4907    }
4908
4909    // --- FIFO topic ---
4910
4911    #[test]
4912    fn publish_to_fifo_topic_requires_group_id() {
4913        let (svc, _state) = make_sns();
4914        let mut req = sns_request("CreateTopic", vec![("Name", "fifo-topic.fifo")]);
4915        req.query_params.insert(
4916            "Attributes.entry.1.key".to_string(),
4917            "FifoTopic".to_string(),
4918        );
4919        req.query_params
4920            .insert("Attributes.entry.1.value".to_string(), "true".to_string());
4921        assert_ok(&svc.create_topic(&req));
4922
4923        let topic_arn = "arn:aws:sns:us-east-1:123456789012:fifo-topic.fifo";
4924        // Publish without MessageGroupId — should fail
4925        let req = sns_request(
4926            "Publish",
4927            vec![("TopicArn", topic_arn), ("Message", "Hello")],
4928        );
4929        let result = svc.publish(&req);
4930        assert!(
4931            result.is_err(),
4932            "FIFO publish without MessageGroupId should error"
4933        );
4934    }
4935
4936    // --- SMS attributes ---
4937
4938    #[test]
4939    fn set_and_get_sms_attributes() {
4940        let (svc, _state) = make_sns();
4941
4942        let mut req = sns_request("SetSMSAttributes", vec![]);
4943        req.query_params.insert(
4944            "attributes.entry.1.key".to_string(),
4945            "DefaultSMSType".to_string(),
4946        );
4947        req.query_params.insert(
4948            "attributes.entry.1.value".to_string(),
4949            "Transactional".to_string(),
4950        );
4951        assert_ok(&svc.set_sms_attributes(&req));
4952
4953        let req = sns_request("GetSMSAttributes", vec![]);
4954        let result = svc.get_sms_attributes(&req);
4955        assert_ok(&result);
4956        let body = response_body(&result);
4957        assert!(
4958            body.contains("DefaultSMSType"),
4959            "Should contain set SMS attribute"
4960        );
4961    }
4962
4963    // --- Phone opt-out ---
4964
4965    #[test]
4966    fn check_phone_opted_out() {
4967        let (svc, state) = make_sns();
4968        state.write().seed_default_opted_out();
4969
4970        let req = sns_request(
4971            "CheckIfPhoneNumberIsOptedOut",
4972            vec![("phoneNumber", "+15005550099")],
4973        );
4974        let result = svc.check_if_phone_number_is_opted_out(&req);
4975        assert_ok(&result);
4976        let body = response_body(&result);
4977        assert!(
4978            body.contains("<isOptedOut>true</isOptedOut>"),
4979            "Seeded number should be opted out"
4980        );
4981    }
4982
4983    #[test]
4984    fn list_phone_numbers_opted_out() {
4985        let (svc, state) = make_sns();
4986        state.write().seed_default_opted_out();
4987
4988        let req = sns_request("ListPhoneNumbersOptedOut", vec![]);
4989        let result = svc.list_phone_numbers_opted_out(&req);
4990        assert_ok(&result);
4991        let body = response_body(&result);
4992        assert!(
4993            body.contains("+15005550099"),
4994            "Should list seeded opted-out number"
4995        );
4996    }
4997
4998    #[test]
4999    fn opt_in_phone_number() {
5000        let (svc, state) = make_sns();
5001        state.write().seed_default_opted_out();
5002
5003        let req = sns_request("OptInPhoneNumber", vec![("phoneNumber", "+15005550099")]);
5004        assert_ok(&svc.opt_in_phone_number(&req));
5005
5006        // Verify removed from opted-out list
5007        let s = state.read();
5008        assert!(
5009            !s.opted_out_numbers.contains(&"+15005550099".to_string()),
5010            "Phone should no longer be opted out"
5011        );
5012    }
5013
5014    // --- Delete topic also removes subscriptions ---
5015
5016    #[test]
5017    fn delete_topic_removes_subscriptions() {
5018        let (svc, state) = make_sns();
5019        assert_ok(&svc.create_topic(&sns_request("CreateTopic", vec![("Name", "del-sub-topic")])));
5020        let topic_arn = "arn:aws:sns:us-east-1:123456789012:del-sub-topic";
5021        assert_ok(&svc.subscribe(&sns_request(
5022            "Subscribe",
5023            vec![
5024                ("TopicArn", topic_arn),
5025                ("Protocol", "email"),
5026                ("Endpoint", "u@example.com"),
5027            ],
5028        )));
5029
5030        assert_eq!(state.read().subscriptions.len(), 1);
5031
5032        assert_ok(&svc.delete_topic(&sns_request("DeleteTopic", vec![("TopicArn", topic_arn)])));
5033        assert_eq!(
5034            state.read().subscriptions.len(),
5035            0,
5036            "Subscriptions should be removed with topic"
5037        );
5038    }
5039
5040    #[test]
5041    fn malformed_filter_policy_does_not_match() {
5042        let sub = SnsSubscription {
5043            subscription_arn: "arn:aws:sns:us-east-1:123456789012:t:sub-1".to_string(),
5044            topic_arn: "arn:aws:sns:us-east-1:123456789012:t".to_string(),
5045            protocol: "sqs".to_string(),
5046            endpoint: "arn:aws:sqs:us-east-1:123456789012:q".to_string(),
5047            owner: "123456789012".to_string(),
5048            attributes: HashMap::from([(
5049                "FilterPolicy".to_string(),
5050                "not valid json {{[".to_string(),
5051            )]),
5052            confirmed: true,
5053            confirmation_token: None,
5054        };
5055        let attrs = HashMap::new();
5056        assert!(
5057            !matches_filter_policy(&sub, &attrs, "hello"),
5058            "malformed FilterPolicy JSON must not match (fail closed)"
5059        );
5060    }
5061}