1use crate::iam::PolicyDocument;
2use crate::intrinsic::{get_arn, get_ref};
3use crate::lambda::{FunctionRef, PermissionBuilder};
4use crate::shared::{Id, TOPIC_POLICY_ID_SUFFIX};
5use crate::sns::{SnsSubscriptionProperties, Subscription, Topic, TopicPolicy, TopicPolicyProperties, TopicPolicyRef, TopicProperties, TopicRef};
6use crate::stack::{Resource, StackBuilder};
7use crate::type_state;
8use crate::wrappers::{LambdaPermissionAction, StringWithOnlyAlphaNumericsUnderscoresAndHyphens};
9use serde_json::Value;
10use std::marker::PhantomData;
11
12const FIFO_SUFFIX: &str = ".fifo";
13
14#[derive(Debug, Clone)]
15pub enum FifoThroughputScope {
16 Topic,
17 MessageGroup
18}
19
20pub enum SubscriptionType<'a> {
21 Lambda(&'a FunctionRef)
22}
23
24impl From<FifoThroughputScope> for String {
25 fn from(value: FifoThroughputScope) -> Self {
26 match value {
27 FifoThroughputScope::Topic => "Topic".to_string(),
28 FifoThroughputScope::MessageGroup => "MessageGroup".to_string(),
29 }
30 }
31}
32
33type_state!(
34 TopicBuilderState,
35 StartState,
36 StandardStateWithSubscriptions,
37 FifoState,
38 FifoStateWithSubscriptions,
39);
40
41pub struct TopicBuilder<T: TopicBuilderState> {
70 state: PhantomData<T>,
71 id: Id,
72 topic_name: Option<String>,
73 content_based_deduplication: Option<bool>,
74 fifo_throughput_scope: Option<FifoThroughputScope>,
75 topic_policy_doc: Option<PolicyDocument>,
76 lambda_subscription_ids: Vec<(Id, String)>,
77}
78
79impl TopicBuilder<StartState> {
80 pub fn new(id: &str) -> Self {
85 Self {
86 state: Default::default(),
87 id: Id(id.to_string()),
88 topic_name: None,
89 content_based_deduplication: None,
90 fifo_throughput_scope: None,
91 topic_policy_doc: None,
92 lambda_subscription_ids: vec![],
93 }
94 }
95
96 pub fn add_subscription(mut self, subscription: SubscriptionType) -> TopicBuilder<StandardStateWithSubscriptions> {
100 self.add_subscription_internal(subscription);
101
102 TopicBuilder {
103 state: Default::default(),
104 id: self.id,
105 topic_name: self.topic_name,
106 content_based_deduplication: self.content_based_deduplication,
107 fifo_throughput_scope: self.fifo_throughput_scope,
108 topic_policy_doc: self.topic_policy_doc,
109 lambda_subscription_ids: self.lambda_subscription_ids,
110 }
111 }
112
113 pub fn build(self, stack_builder: &mut StackBuilder) -> TopicRef {
114 self.build_internal(false, stack_builder)
115 }
116}
117
118impl TopicBuilder<StandardStateWithSubscriptions> {
119 pub fn add_subscription(mut self, subscription: SubscriptionType) -> TopicBuilder<StandardStateWithSubscriptions> {
120 self.add_subscription_internal(subscription);
121
122 TopicBuilder {
123 state: Default::default(),
124 id: self.id,
125 topic_name: self.topic_name,
126 content_based_deduplication: self.content_based_deduplication,
127 fifo_throughput_scope: self.fifo_throughput_scope,
128 topic_policy_doc: self.topic_policy_doc,
129 lambda_subscription_ids: self.lambda_subscription_ids,
130 }
131 }
132
133 pub fn build(self, stack_builder: &mut StackBuilder) -> TopicRef {
134 self.build_internal(false, stack_builder)
135 }
136}
137
138impl<T: TopicBuilderState> TopicBuilder<T> {
139 pub fn topic_name(self, topic_name: StringWithOnlyAlphaNumericsUnderscoresAndHyphens) -> TopicBuilder<T> {
140 TopicBuilder {
141 topic_name: Some(topic_name.0),
142 id: self.id,
143 state: Default::default(),
144 content_based_deduplication: self.content_based_deduplication,
145 fifo_throughput_scope: self.fifo_throughput_scope,
146 topic_policy_doc: self.topic_policy_doc,
147 lambda_subscription_ids: self.lambda_subscription_ids,
148 }
149 }
150
151 pub fn fifo(self) -> TopicBuilder<FifoState> {
152 TopicBuilder {
153 state: Default::default(),
154 id: self.id,
155 topic_name: self.topic_name,
156 content_based_deduplication: self.content_based_deduplication,
157 fifo_throughput_scope: self.fifo_throughput_scope,
158 topic_policy_doc: self.topic_policy_doc,
159 lambda_subscription_ids: self.lambda_subscription_ids,
160 }
161 }
162
163 pub fn topic_policy(self, doc: PolicyDocument) -> TopicBuilder<T> {
166 TopicBuilder {
167 topic_policy_doc: Some(doc),
168 topic_name: self.topic_name,
169 id: self.id,
170 state: Default::default(),
171 content_based_deduplication: self.content_based_deduplication,
172 fifo_throughput_scope: self.fifo_throughput_scope,
173 lambda_subscription_ids: self.lambda_subscription_ids,
174 }
175 }
176
177 fn add_subscription_internal(&mut self, subscription: SubscriptionType) {
178 match subscription {
179 SubscriptionType::Lambda(l) => self.lambda_subscription_ids.push((l.get_id().clone(), l.get_resource_id().to_string()))
180 };
181 }
182
183 fn build_internal(self, fifo: bool, stack_builder: &mut StackBuilder) -> TopicRef {
184 let topic_resource_id = Resource::generate_id("SnsTopic");
185
186 self.lambda_subscription_ids.iter().for_each(|(to_subscribe_id, to_subscribe_resource_id)| {
187 let subscription_id = Id::combine_ids(&self.id, to_subscribe_id);
188 let subscription_resource_id = Resource::generate_id("SnsSubscription");
189
190 PermissionBuilder::new(&Id::generate_id(&subscription_id, "Permission"), LambdaPermissionAction("lambda:InvokeFunction".to_string()), get_arn(to_subscribe_resource_id), "sns.amazonaws.com")
191 .source_arn(get_ref(&topic_resource_id))
192 .build(stack_builder);
193
194 let subscription = Subscription {
195 id: subscription_id,
196 resource_id: subscription_resource_id,
197 r#type: "AWS::SNS::Subscription".to_string(),
198 properties: SnsSubscriptionProperties {
199 protocol: "lambda".to_string(),
200 endpoint: get_arn(to_subscribe_resource_id),
201 topic_arn: get_ref(&topic_resource_id),
202 },
203 };
204
205 stack_builder.add_resource(subscription);
206 });
207
208 let properties = TopicProperties {
209 topic_name: self.topic_name,
210 fifo_topic: Some(fifo),
211 content_based_deduplication: self.content_based_deduplication,
212 fifo_throughput_scope: self.fifo_throughput_scope.map(Into::into),
213 };
214
215 let topic_ref = TopicRef::new(self.id.clone(), topic_resource_id.to_string());
216
217 if let Some(mut policy) = self.topic_policy_doc {
218 for statement in &mut policy.statements {
219 statement.resource = Some(vec![topic_ref.get_ref()]);
221 }
222 TopicPolicyBuilder::new(Id::generate_id(&self.id, TOPIC_POLICY_ID_SUFFIX), policy, vec![&topic_ref]).build(stack_builder);
223 }
224
225 stack_builder.add_resource(Topic {
226 id: self.id,
227 resource_id: topic_resource_id,
228 r#type: "AWS::SNS::Topic".to_string(),
229 properties,
230 });
231
232 topic_ref
233 }
234}
235
236impl TopicBuilder<FifoState> {
237 pub fn fifo_throughput_scope(self, scope: FifoThroughputScope) -> TopicBuilder<FifoState> {
238 Self {
239 fifo_throughput_scope: Some(scope),
240 ..self
241 }
242 }
243
244 pub fn content_based_deduplication(self, content_based_deduplication: bool) -> TopicBuilder<FifoState> {
245 Self {
246 content_based_deduplication: Some(content_based_deduplication),
247 ..self
248 }
249 }
250
251 pub fn add_subscription(mut self, subscription: SubscriptionType) -> TopicBuilder<FifoStateWithSubscriptions> {
252 self.add_subscription_internal(subscription);
253
254 TopicBuilder {
255 state: Default::default(),
256 id: self.id,
257 topic_name: self.topic_name,
258 content_based_deduplication: self.content_based_deduplication,
259 fifo_throughput_scope: self.fifo_throughput_scope,
260 topic_policy_doc: self.topic_policy_doc,
261 lambda_subscription_ids: self.lambda_subscription_ids,
262 }
263 }
264
265 pub fn build(mut self, stack_builder: &mut StackBuilder) -> TopicRef {
269 if let Some(ref name) = self.topic_name
270 && !name.ends_with(FIFO_SUFFIX) {
271 self.topic_name = Some(format!("{}{}", name, FIFO_SUFFIX));
272 }
273 self.build_internal(true, stack_builder)
274 }
275}
276
277impl TopicBuilder<FifoStateWithSubscriptions> {
278 pub fn fifo_throughput_scope(self, scope: FifoThroughputScope) -> TopicBuilder<FifoStateWithSubscriptions> {
279 Self {
280 fifo_throughput_scope: Some(scope),
281 ..self
282 }
283 }
284
285 pub fn content_based_deduplication(self, content_based_deduplication: bool) -> TopicBuilder<FifoStateWithSubscriptions> {
286 Self {
287 content_based_deduplication: Some(content_based_deduplication),
288 ..self
289 }
290 }
291
292 pub fn add_subscription(mut self, subscription: SubscriptionType) -> TopicBuilder<FifoStateWithSubscriptions> {
293 self.add_subscription_internal(subscription);
294
295 TopicBuilder {
296 state: Default::default(),
297 id: self.id,
298 topic_name: self.topic_name,
299 content_based_deduplication: self.content_based_deduplication,
300 fifo_throughput_scope: self.fifo_throughput_scope,
301 topic_policy_doc: self.topic_policy_doc,
302 lambda_subscription_ids: self.lambda_subscription_ids,
303 }
304 }
305
306 pub fn build(mut self, stack_builder: &mut StackBuilder) -> TopicRef {
311 if let Some(ref name) = self.topic_name
312 && !name.ends_with(FIFO_SUFFIX) {
313 self.topic_name = Some(format!("{}{}", name, FIFO_SUFFIX));
314 }
315 self.build_internal(true, stack_builder)
316 }
317}
318
319pub(crate) struct TopicPolicyBuilder {
320 id: Id,
321 doc: PolicyDocument,
322 topics: Vec<Value>
323}
324
325impl TopicPolicyBuilder {
326 pub(crate) fn new(id: Id, doc: PolicyDocument, topics: Vec<&TopicRef>) -> Self {
328 Self::new_with_values(id, doc, topics.into_iter().map(|v| v.get_ref()).collect())
329 }
330
331 pub(crate) fn new_with_values(id: Id, doc: PolicyDocument, topics: Vec<Value>) -> Self {
332 Self {
333 id,
334 doc,
335 topics,
336 }
337 }
338
339 pub(crate) fn build(self, stack_builder: &mut StackBuilder) -> TopicPolicyRef {
340 let resource_id = Resource::generate_id("TopicPolicy");
341 stack_builder.add_resource(TopicPolicy {
342 id: self.id.clone(),
343 resource_id: resource_id.clone(),
344 r#type: "AWS::SNS::TopicPolicy".to_string(),
345 properties: TopicPolicyProperties {
346 doc: self.doc,
347 topics: self.topics,
348 },
349 });
350
351 TopicPolicyRef::new(self.id, resource_id)
352 }
353}
354