docbox_core/events/
sqs.rs1use super::{EventPublisher, TenantEventMessage};
2use aws_sdk_sqs::Client as SqsClient;
3use docbox_database::models::tenant::TenantId;
4use serde::Serialize;
5use tracing::Instrument;
6
7#[derive(Clone)]
8pub struct SqsEventPublisherFactory {
9 client: SqsClient,
10}
11
12impl SqsEventPublisherFactory {
13 pub fn new(client: SqsClient) -> Self {
14 Self { client }
15 }
16
17 pub fn create_event_publisher(&self, target: TenantSqsEventQueue) -> SqsEventPublisher {
18 SqsEventPublisher {
19 client: self.client.clone(),
20 target,
21 }
22 }
23}
24
25#[derive(Clone)]
27pub struct SqsEventPublisher {
28 client: SqsClient,
29 target: TenantSqsEventQueue,
30}
31
32#[derive(Clone)]
34pub struct TenantSqsEventQueue {
35 pub tenant_id: TenantId,
36 pub event_queue_url: String,
37}
38
39#[derive(Debug, Serialize)]
45struct TenantEventMessageContainer {
46 tenant_id: TenantId,
47 #[serde(flatten)]
48 message: TenantEventMessage,
49}
50
51impl EventPublisher for SqsEventPublisher {
52 fn publish_event(&self, event: TenantEventMessage) {
53 let client = self.client.clone();
54 let tenant_id = self.target.tenant_id;
55 let event_queue_url = self.target.event_queue_url.clone();
56
57 let event = TenantEventMessageContainer {
59 message: event,
60 tenant_id,
61 };
62
63 let span = tracing::Span::current();
64
65 tokio::spawn(
66 async move {
67 let msg = match serde_json::to_string(&event) {
69 Ok(value) => value,
70 Err(cause) => {
71 tracing::error!(?cause, ?event, "failed to serialize tenant event");
72 return;
73 }
74 };
75
76 tracing::debug!(?event, "emitting tenant event");
77
78 if let Err(cause) = client
80 .send_message()
81 .queue_url(event_queue_url)
82 .message_body(msg)
83 .send()
84 .await
85 {
86 tracing::error!(?cause, ?event, "failed to emit tenant event");
87 }
88 }
89 .instrument(span),
90 );
91 }
92}