1use std::sync::Arc;
2
3use crate::helpers::gcp_config;
4use google_cloud_auth::credentials::CredentialsFile;
5use google_cloud_googleapis::pubsub::v1::PubsubMessage;
6use google_cloud_pubsub::client::{Client, ClientConfig};
7use google_cloud_pubsub::publisher::Publisher;
8use google_cloud_pubsub::subscription::{Subscription, SubscriptionConfig};
9
10use log::{debug, error, info};
11use serde::Serialize;
12
13pub struct PubSubsStuff {
14 pub publishers: Arc<[(String, Publisher)]>,
15 pub subscriptions: Arc<[(String, Subscription)]>,
16}
17
18impl PubSubsStuff {
19 pub async fn new(
20 project_id: Option<String>,
21 instance_id: &str,
22 topics: Arc<[&'static str]>,
23 subs: Arc<[&'static str]>,
24 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
25 info!("Initializing PubSub client");
26
27 let key_file_path = gcp_config::credentials_path_from_env().map_err(|e| {
28 let err: Box<dyn std::error::Error + Send + Sync> = e.into();
29 err
30 })?;
31
32 let project_id = gcp_config::resolve_project_id(project_id)
33 .await
34 .map_err(|e| {
35 let err: Box<dyn std::error::Error + Send + Sync> = e.into();
36 err
37 })?;
38
39 info!("Using project_id: '{}'", project_id);
40
41 let expanded_topics: Vec<(String, &str)> = topics
43 .iter()
44 .map(|name| {
45 (
46 format!("projects/{}/topics/{}-{}", project_id, name, instance_id),
47 *name,
48 )
49 })
50 .collect();
51
52 let expanded_subs: Vec<(String, &str)> = subs
54 .iter()
55 .map(|name| {
56 (
57 format!("projects/{}/subscriptions/{}", project_id, name),
58 *name,
59 )
60 })
61 .collect();
62
63 let credentials = CredentialsFile::new_from_file(key_file_path).await?;
64 let config = ClientConfig::default()
65 .with_credentials(credentials)
66 .await?;
67 let client = Client::new(config).await?;
68
69 let mut publishers_vec = Vec::with_capacity(expanded_topics.len());
72
73 for (topic_path, name) in expanded_topics.iter() {
74 let publisher = client.topic(topic_path).new_publisher(None);
75 publishers_vec.push((name.to_string(), publisher));
76 debug!("Created publisher '{}'", name);
77 }
78
79 let publishers: Arc<[(String, Publisher)]> = Arc::from(publishers_vec);
80
81 let mut subscriptions_vec = Vec::with_capacity(expanded_subs.len());
84
85 for (sub_path, name) in expanded_subs.iter() {
86 let sub_config = SubscriptionConfig {
87 push_config: None,
88 ack_deadline_seconds: 10,
89 retain_acked_messages: false,
90 message_retention_duration: None,
91 labels: Default::default(),
92 enable_message_ordering: true,
93 expiration_policy: None,
94 filter: String::new(),
95 dead_letter_policy: None,
96 retry_policy: None,
97 detached: false,
98 topic_message_retention_duration: None,
99 enable_exactly_once_delivery: false,
100 bigquery_config: None,
101 state: 0,
102 cloud_storage_config: None,
103 };
104
105 let subscription = match client
106 .create_subscription(sub_path, "", sub_config, None)
107 .await
108 {
109 Ok(sub) => sub,
110 Err(err) => {
111 error!(
112 "Failed to create subscription '{}': {:?}. Falling back.",
113 name, err
114 );
115 client.subscription(sub_path)
116 }
117 };
118
119 subscriptions_vec.push((name.to_string(), subscription));
120 debug!("Created subscription '{}'", name);
121 }
122
123 let subscriptions: Arc<[(String, Subscription)]> = Arc::from(subscriptions_vec);
124
125 info!("PubSub client initialized successfully");
126
127 Ok(Self {
128 publishers,
129 subscriptions,
130 })
131 }
132
133 pub fn get_publisher(&self, name: &str) -> Option<Publisher> {
136 self.publishers
137 .iter()
138 .find(|(n, _)| n == name)
139 .map(|(_, p)| p.clone())
140 }
141
142 pub fn get_subscription(&self, name: &str) -> Option<Subscription> {
143 self.subscriptions
144 .iter()
145 .find(|(n, _)| n == name)
146 .map(|(_, s)| s.clone())
147 }
148
149 pub fn create_message<T: Serialize>(
152 &self,
153 payload: T,
154 ordering_key: Option<String>,
155 ) -> Result<PubsubMessage, serde_json::Error> {
156 let data = serde_json::to_vec(&payload)?;
157
158 Ok(PubsubMessage {
159 data,
160 attributes: Default::default(),
161 ordering_key: ordering_key.unwrap_or_default(),
162 message_id: String::new(),
163 publish_time: None,
164 })
165 }
166
167 pub async fn publish_fire_and_forget<T: Serialize + Send + 'static>(
168 &self,
169 topic: &str,
170 payload: T,
171 ordering_key: Option<String>,
172 ) {
173 let publisher = self.get_publisher(topic);
174 let topic_name = topic.to_string();
175
176 tokio::spawn(async move {
177 match publisher {
178 Some(publisher) => match serde_json::to_vec(&payload) {
179 Ok(data) => {
180 let message = PubsubMessage {
181 data,
182 attributes: Default::default(),
183 ordering_key: ordering_key.unwrap_or_default(),
184 message_id: String::new(),
185 publish_time: None,
186 };
187 publisher.publish(message).await;
188 debug!("Message published to '{}'", topic_name);
189 }
190 Err(e) => error!("Failed to serialize payload: {:?}", e),
191 },
192 None => error!("Publisher '{}' not found", topic_name),
193 }
194 });
195 }
196}
197
198pub async fn create_pubsub_client(
199 project_id: Option<String>,
200 instance_id: &str,
201 topics: Arc<[&'static str]>,
202 subs: Arc<[&'static str]>,
203) -> Result<PubSubsStuff, Box<dyn std::error::Error + Send + Sync>> {
204 PubSubsStuff::new(project_id, instance_id, topics, subs).await
205}