docbox_core/events/
sqs.rs

1use super::{EventPublisher, TenantEventMessage};
2use aws_sdk_sqs::Client as SqsClient;
3use docbox_database::models::tenant::TenantId;
4use serde::Serialize;
5
6#[derive(Clone)]
7pub struct SqsEventPublisherFactory {
8    client: SqsClient,
9}
10
11impl SqsEventPublisherFactory {
12    pub fn new(client: SqsClient) -> Self {
13        Self { client }
14    }
15
16    pub fn create_event_publisher(&self, target: TenantSqsEventQueue) -> SqsEventPublisher {
17        SqsEventPublisher {
18            client: self.client.clone(),
19            target,
20        }
21    }
22}
23
24/// Tenant event publisher that publishes events through SQS
25#[derive(Clone)]
26pub struct SqsEventPublisher {
27    client: SqsClient,
28    target: TenantSqsEventQueue,
29}
30
31/// Target SQS details queue
32#[derive(Clone)]
33pub struct TenantSqsEventQueue {
34    pub tenant_id: TenantId,
35    pub event_queue_url: String,
36}
37
38/// Container around an event message containing the ID of the
39/// tenant that the message occurred within for multi-tenanted
40/// event handling
41///
42/// i.e { "event": "DOCUMENT_BOX_CREATED", "data": { ...document box data }, "tenant_id": "xxxxx-xxxxx-xxxxx-xxxxx" }
43#[derive(Debug, Serialize)]
44struct TenantEventMessageContainer {
45    tenant_id: TenantId,
46    #[serde(flatten)]
47    message: TenantEventMessage,
48}
49
50impl EventPublisher for SqsEventPublisher {
51    fn publish_event(&self, event: TenantEventMessage) {
52        let client = self.client.clone();
53        let tenant_id = self.target.tenant_id;
54        let event_queue_url = self.target.event_queue_url.clone();
55
56        // Wrap the event message providing the tenant_id
57        let event = TenantEventMessageContainer {
58            message: event,
59            tenant_id,
60        };
61
62        tokio::spawn(async move {
63            // Serialize the event message
64            let msg = match serde_json::to_string(&event) {
65                Ok(value) => value,
66                Err(cause) => {
67                    tracing::error!(?cause, ?event, "failed to serialize tenant event");
68                    return;
69                }
70            };
71
72            tracing::debug!(?event, "emitting tenant event");
73
74            // Push the event to the SQS queue
75            if let Err(cause) = client
76                .send_message()
77                .queue_url(event_queue_url)
78                .message_body(msg)
79                .send()
80                .await
81            {
82                tracing::error!(?cause, ?event, "failed to emit tenant event");
83            }
84        });
85    }
86}