docbox_core/events/
sqs.rs

1use super::{EventPublisher, TenantEventMessage};
2use aws_sdk_sqs::Client as SqsClient;
3use docbox_database::models::tenant::TenantId;
4use serde::Serialize;
5use tracing::Instrument;
6
7#[derive(Clone)]
8pub struct SqsEventPublisherFactory {
9    client: SqsClient,
10}
11
12impl SqsEventPublisherFactory {
13    pub fn new(client: SqsClient) -> Self {
14        Self { client }
15    }
16
17    pub fn create_event_publisher(&self, target: TenantSqsEventQueue) -> SqsEventPublisher {
18        SqsEventPublisher {
19            client: self.client.clone(),
20            target,
21        }
22    }
23}
24
25/// Tenant event publisher that publishes events through SQS
26#[derive(Clone)]
27pub struct SqsEventPublisher {
28    client: SqsClient,
29    target: TenantSqsEventQueue,
30}
31
32/// Target SQS details queue
33#[derive(Clone)]
34pub struct TenantSqsEventQueue {
35    pub tenant_id: TenantId,
36    pub event_queue_url: String,
37}
38
39/// Container around an event message containing the ID of the
40/// tenant that the message occurred within for multi-tenanted
41/// event handling
42///
43/// i.e { "event": "DOCUMENT_BOX_CREATED", "data": { ...document box data }, "tenant_id": "xxxxx-xxxxx-xxxxx-xxxxx" }
44#[derive(Debug, Serialize)]
45struct TenantEventMessageContainer {
46    tenant_id: TenantId,
47    #[serde(flatten)]
48    message: TenantEventMessage,
49}
50
51impl EventPublisher for SqsEventPublisher {
52    fn publish_event(&self, event: TenantEventMessage) {
53        let client = self.client.clone();
54        let tenant_id = self.target.tenant_id;
55        let event_queue_url = self.target.event_queue_url.clone();
56
57        // Wrap the event message providing the tenant_id
58        let event = TenantEventMessageContainer {
59            message: event,
60            tenant_id,
61        };
62
63        let span = tracing::Span::current();
64
65        tokio::spawn(
66            async move {
67                // Serialize the event message
68                let msg = match serde_json::to_string(&event) {
69                    Ok(value) => value,
70                    Err(cause) => {
71                        tracing::error!(?cause, ?event, "failed to serialize tenant event");
72                        return;
73                    }
74                };
75
76                tracing::debug!(?event, "emitting tenant event");
77
78                // Push the event to the SQS queue
79                if let Err(cause) = client
80                    .send_message()
81                    .queue_url(event_queue_url)
82                    .message_body(msg)
83                    .send()
84                    .await
85                {
86                    tracing::error!(?cause, ?event, "failed to emit tenant event");
87                }
88            }
89            .instrument(span),
90        );
91    }
92}