1use crate::iam::{PolicyDocument, RoleRef};
2use crate::intrinsic::{get_arn, get_ref};
3use crate::lambda::{FunctionRef, PermissionBuilder};
4use crate::shared::{Id, TOPIC_POLICY_ID_SUFFIX};
5use crate::sns::{LoggingConfig, SnsSubscriptionProperties, Subscription, SubscriptionDtoType, Topic, TopicPolicy, TopicPolicyProperties, TopicPolicyRef, TopicPolicyType, TopicProperties, TopicRef, TopicType};
6use crate::stack::{Resource, StackBuilder};
7use crate::type_state;
8use crate::wrappers::{ArchivePolicy, LambdaPermissionAction, StringWithOnlyAlphaNumericsUnderscoresAndHyphens, SuccessFeedbackSampleRate, TopicDisplayName};
9use serde_json::{json, Value};
10use std::marker::PhantomData;
11use crate::kms::KeyRef;
12
13const FIFO_SUFFIX: &str = ".fifo";
14
15#[derive(Debug, Clone)]
16pub enum FifoThroughputScope {
17 Topic,
18 MessageGroup
19}
20
21pub enum SubscriptionType<'a> {
22 Lambda(&'a FunctionRef)
23}
24
25impl From<FifoThroughputScope> for String {
26 fn from(value: FifoThroughputScope) -> Self {
27 match value {
28 FifoThroughputScope::Topic => "Topic".to_string(),
29 FifoThroughputScope::MessageGroup => "MessageGroup".to_string(),
30 }
31 }
32}
33
34#[derive(Debug, Clone)]
35pub enum TracingConfig {
36 PassThrough,
37 Active,
38}
39
40impl From<TracingConfig> for String {
41 fn from(value: TracingConfig) -> Self {
42 match value {
43 TracingConfig::PassThrough => "PassThrough".to_string(),
44 TracingConfig::Active => "Active".to_string(),
45 }
46 }
47}
48
49type_state!(
50 TopicBuilderState,
51 StartState,
52 StandardStateWithSubscriptions,
53 FifoState,
54 FifoStateWithSubscriptions,
55);
56
57pub struct TopicBuilder<T: TopicBuilderState> {
86 state: PhantomData<T>,
87 id: Id,
88 topic_name: Option<String>,
89 content_based_deduplication: Option<bool>,
90 fifo_throughput_scope: Option<FifoThroughputScope>,
91 topic_policy_doc: Option<PolicyDocument>,
92 lambda_subscription_ids: Vec<(Id, String)>,
93 archive_policy: Option<String>,
94 display_name: Option<String>,
95 kms_master_key_id: Option<Value>,
96 tracing_config: Option<String>,
97 logging_config: Option<LoggingConfig>,
98}
99
100impl TopicBuilder<StartState> {
101 pub fn new(id: &str) -> Self {
106 Self {
107 state: Default::default(),
108 id: Id(id.to_string()),
109 topic_name: None,
110 content_based_deduplication: None,
111 fifo_throughput_scope: None,
112 topic_policy_doc: None,
113 lambda_subscription_ids: vec![],
114 archive_policy: None,
115 display_name: None,
116 kms_master_key_id: None,
117 tracing_config: None,
118 logging_config: None,
119 }
120 }
121
122 pub fn add_subscription(mut self, subscription: SubscriptionType) -> TopicBuilder<StandardStateWithSubscriptions> {
126 self.add_subscription_internal(subscription);
127
128 TopicBuilder {
129 state: Default::default(),
130 id: self.id,
131 topic_name: self.topic_name,
132 content_based_deduplication: self.content_based_deduplication,
133 fifo_throughput_scope: self.fifo_throughput_scope,
134 topic_policy_doc: self.topic_policy_doc,
135 lambda_subscription_ids: self.lambda_subscription_ids,
136 archive_policy: self.archive_policy,
137 display_name: self.display_name,
138 kms_master_key_id: self.kms_master_key_id,
139 tracing_config: self.tracing_config,
140 logging_config: self.logging_config,
141 }
142 }
143
144 pub fn build(self, stack_builder: &mut StackBuilder) -> TopicRef {
145 self.build_internal(false, stack_builder)
146 }
147}
148
149impl TopicBuilder<StandardStateWithSubscriptions> {
150 pub fn add_subscription(mut self, subscription: SubscriptionType) -> Self {
151 self.add_subscription_internal(subscription);
152 self
153 }
154
155 pub fn build(self, stack_builder: &mut StackBuilder) -> TopicRef {
156 self.build_internal(false, stack_builder)
157 }
158}
159
160impl<T: TopicBuilderState> TopicBuilder<T> {
161 pub fn display_name(self, display_name: TopicDisplayName) -> Self {
162 Self {
163 display_name: Some(display_name.0),
164 ..self
165 }
166 }
167
168 pub fn logging_config(self, logging_config: LoggingConfig) -> Self {
169 Self {
170 logging_config: Some(logging_config),
171 ..self
172 }
173 }
174
175 pub fn kms_master_key(self, kms_key: &KeyRef) -> Self {
176 Self {
177 kms_master_key_id: Some(kms_key.get_ref()),
178 ..self
179 }
180 }
181
182 pub fn tracing_config(self, tracing_config: TracingConfig) -> Self {
183 Self {
184 tracing_config: Some(tracing_config.into()),
185 ..self
186 }
187 }
188
189 pub fn topic_name(self, topic_name: StringWithOnlyAlphaNumericsUnderscoresAndHyphens) -> Self {
190 Self {
191 topic_name: Some(topic_name.0),
192 ..self
193 }
194 }
195
196 pub fn fifo(self) -> TopicBuilder<FifoState> {
197 TopicBuilder {
198 state: Default::default(),
199 id: self.id,
200 topic_name: self.topic_name,
201 content_based_deduplication: self.content_based_deduplication,
202 fifo_throughput_scope: self.fifo_throughput_scope,
203 topic_policy_doc: self.topic_policy_doc,
204 lambda_subscription_ids: self.lambda_subscription_ids,
205 display_name: self.display_name,
206 kms_master_key_id: self.kms_master_key_id,
207 archive_policy: self.archive_policy,
208 tracing_config: self.tracing_config,
209 logging_config: self.logging_config,
210 }
211 }
212
213 pub fn topic_policy(self, doc: PolicyDocument) -> Self {
216 Self {
217 topic_policy_doc: Some(doc),
218 ..self
219 }
220 }
221
222 fn add_subscription_internal(&mut self, subscription: SubscriptionType) {
223 match subscription {
224 SubscriptionType::Lambda(l) => self.lambda_subscription_ids.push((l.get_id().clone(), l.get_resource_id().to_string()))
225 };
226 }
227
228 fn build_internal(self, fifo: bool, stack_builder: &mut StackBuilder) -> TopicRef {
229 let topic_resource_id = Resource::generate_id("SnsTopic");
230
231 self.lambda_subscription_ids.iter().for_each(|(to_subscribe_id, to_subscribe_resource_id)| {
232 let subscription_id = Id::combine_ids(&self.id, to_subscribe_id);
233 let subscription_resource_id = Resource::generate_id("SnsSubscription");
234
235 PermissionBuilder::new(&Id::generate_id(&subscription_id, "Permission"), LambdaPermissionAction("lambda:InvokeFunction".to_string()), get_arn(to_subscribe_resource_id), "sns.amazonaws.com")
236 .source_arn(get_ref(&topic_resource_id))
237 .build(stack_builder);
238
239 let subscription = Subscription {
240 id: subscription_id,
241 resource_id: subscription_resource_id,
242 r#type: SubscriptionDtoType::SubscriptionType,
243 properties: SnsSubscriptionProperties {
244 protocol: "lambda".to_string(),
245 endpoint: get_arn(to_subscribe_resource_id),
246 topic_arn: get_ref(&topic_resource_id),
247 },
248 };
249
250 stack_builder.add_resource(subscription);
251 });
252
253 let archive_policy = if let Some(policy_retention_time) = self.archive_policy {
254 Some(json!({ "MessageRetentionPeriod": policy_retention_time }))
255 } else {
256 None
257 };
258
259 let properties = TopicProperties {
260 topic_name: self.topic_name,
261 fifo_topic: Some(fifo),
262 content_based_deduplication: self.content_based_deduplication,
263 fifo_throughput_scope: self.fifo_throughput_scope.map(Into::into),
264 archive_policy,
265 display_name: self.display_name,
266 kms_master_key_id: self.kms_master_key_id,
267 tracing_config: self.tracing_config,
268 delivery_status_logging: self.logging_config,
269 };
270
271 let topic_ref = TopicRef::internal_new(self.id.clone(), topic_resource_id.to_string());
272
273 if let Some(mut policy) = self.topic_policy_doc {
274 for statement in &mut policy.statements {
275 statement.resource = Some(vec![topic_ref.get_ref()]);
277 }
278 TopicPolicyBuilder::new(Id::generate_id(&self.id, TOPIC_POLICY_ID_SUFFIX), policy, vec![&topic_ref]).build(stack_builder);
279 }
280
281 stack_builder.add_resource(Topic {
282 id: self.id,
283 resource_id: topic_resource_id,
284 r#type: TopicType::TopicType,
285 properties,
286 });
287
288 topic_ref
289 }
290}
291
292impl TopicBuilder<FifoState> {
293 pub fn archive_policy(self, archive_policy: ArchivePolicy) -> Self {
294 Self {
295 archive_policy: Some(archive_policy.0.to_string()),
296 ..self
297 }
298 }
299
300 pub fn fifo_throughput_scope(self, scope: FifoThroughputScope) -> TopicBuilder<FifoState> {
301 Self {
302 fifo_throughput_scope: Some(scope),
303 ..self
304 }
305 }
306
307 pub fn content_based_deduplication(self, content_based_deduplication: bool) -> TopicBuilder<FifoState> {
308 Self {
309 content_based_deduplication: Some(content_based_deduplication),
310 ..self
311 }
312 }
313
314 pub fn add_subscription(mut self, subscription: SubscriptionType) -> TopicBuilder<FifoStateWithSubscriptions> {
315 self.add_subscription_internal(subscription);
316
317 TopicBuilder {
318 id: self.id,
319 state: Default::default(),
320 topic_name: self.topic_name,
321 content_based_deduplication: self.content_based_deduplication,
322 fifo_throughput_scope: self.fifo_throughput_scope,
323 topic_policy_doc: self.topic_policy_doc,
324 lambda_subscription_ids: self.lambda_subscription_ids,
325 display_name: self.display_name,
326 kms_master_key_id: self.kms_master_key_id,
327 archive_policy: self.archive_policy,
328 tracing_config: self.tracing_config,
329 logging_config: self.logging_config,
330 }
331 }
332
333 pub fn build(mut self, stack_builder: &mut StackBuilder) -> TopicRef {
337 if let Some(ref name) = self.topic_name
338 && !name.ends_with(FIFO_SUFFIX) {
339 self.topic_name = Some(format!("{}{}", name, FIFO_SUFFIX));
340 }
341 self.build_internal(true, stack_builder)
342 }
343}
344
345impl TopicBuilder<FifoStateWithSubscriptions> {
346 pub fn fifo_throughput_scope(self, scope: FifoThroughputScope) -> Self {
347 Self {
348 fifo_throughput_scope: Some(scope),
349 ..self
350 }
351 }
352
353 pub fn content_based_deduplication(self, content_based_deduplication: bool) -> Self {
354 Self {
355 content_based_deduplication: Some(content_based_deduplication),
356 ..self
357 }
358 }
359
360 pub fn add_subscription(mut self, subscription: SubscriptionType) -> TopicBuilder<FifoStateWithSubscriptions> {
361 self.add_subscription_internal(subscription);
362 self
363 }
364
365 pub fn build(mut self, stack_builder: &mut StackBuilder) -> TopicRef {
370 if let Some(ref name) = self.topic_name
371 && !name.ends_with(FIFO_SUFFIX) {
372 self.topic_name = Some(format!("{}{}", name, FIFO_SUFFIX));
373 }
374 self.build_internal(true, stack_builder)
375 }
376}
377
378pub enum Protocol {
379 HTTP,
380 SQS,
381 Lambda,
382 Firehose,
383 Application,
384}
385
386impl From<Protocol> for String {
387 fn from(value: Protocol) -> String {
388 match value {
389 Protocol::HTTP => "http".to_string(),
390 Protocol::SQS => "sqs".to_string(),
391 Protocol::Lambda => "lambda".to_string(),
392 Protocol::Firehose => "firehose".to_string(),
393 Protocol::Application => "application".to_string(),
394 }
395 }
396}
397
398pub struct LoggingConfigBuilder {
399 protocol: String,
400 success_feedback_sample_rate: Option<u8>,
401 failure_feedback_role: Option<Value>,
402 success_feedback_role: Option<Value>,
403}
404
405impl LoggingConfigBuilder {
406 pub fn new(protocol: Protocol) -> Self {
407 Self {
408 protocol: protocol.into(),
409 success_feedback_sample_rate: None,
410 failure_feedback_role: None,
411 success_feedback_role: None,
412 }
413 }
414
415 pub fn success_feedback_role(self, role: &RoleRef) -> Self {
416 Self {
417 success_feedback_role: Some(role.get_arn()),
418 ..self
419 }
420 }
421
422 pub fn failure_feedback_role(self, role: &RoleRef) -> Self {
423 Self {
424 failure_feedback_role: Some(role.get_arn()),
425 ..self
426 }
427 }
428
429 pub fn success_feedback_sample_rate(self, success_feedback_sample_rate: SuccessFeedbackSampleRate) -> Self {
430 Self {
431 success_feedback_sample_rate: Some(success_feedback_sample_rate.0),
432 ..self
433 }
434 }
435
436 pub fn build(self) -> LoggingConfig {
437 LoggingConfig {
438 failure_feedback_role_arn: self.failure_feedback_role,
439 protocol: self.protocol,
440 success_feedback_role_arn: self.success_feedback_role,
441 success_feedback_sample_rate: self.success_feedback_sample_rate,
442 }
443 }
444}
445
446pub(crate) struct TopicPolicyBuilder {
447 id: Id,
448 doc: PolicyDocument,
449 topics: Vec<Value>
450}
451
452impl TopicPolicyBuilder {
453 pub(crate) fn new(id: Id, doc: PolicyDocument, topics: Vec<&TopicRef>) -> Self {
455 Self::new_with_values(id, doc, topics.into_iter().map(|v| v.get_ref()).collect())
456 }
457
458 pub(crate) fn new_with_values(id: Id, doc: PolicyDocument, topics: Vec<Value>) -> Self {
459 Self {
460 id,
461 doc,
462 topics,
463 }
464 }
465
466 pub(crate) fn build(self, stack_builder: &mut StackBuilder) -> TopicPolicyRef {
467 let resource_id = Resource::generate_id("TopicPolicy");
468 stack_builder.add_resource(TopicPolicy {
469 id: self.id.clone(),
470 resource_id: resource_id.clone(),
471 r#type: TopicPolicyType::TopicPolicyType,
472 properties: TopicPolicyProperties {
473 doc: self.doc,
474 topics: self.topics,
475 },
476 });
477
478 TopicPolicyRef::internal_new(self.id, resource_id)
479 }
480}
481