gcp_rust_tools/
pubsub.rs

1use std::sync::Arc;
2
3use crate::helpers::gcp_config;
4use google_cloud_auth::credentials::CredentialsFile;
5use google_cloud_googleapis::pubsub::v1::PubsubMessage;
6use google_cloud_pubsub::client::{Client, ClientConfig};
7use google_cloud_pubsub::publisher::Publisher;
8use google_cloud_pubsub::subscription::{Subscription, SubscriptionConfig};
9
10use log::{debug, error, info};
11use serde::Serialize;
12
13pub struct PubSubsStuff {
14    pub publishers: Arc<[(String, Publisher)]>,
15    pub subscriptions: Arc<[(String, Subscription)]>,
16}
17
18impl PubSubsStuff {
19    pub async fn new(
20        project_id: Option<String>,
21        instance_id: &str,
22        topics: Arc<[&'static str]>,
23        subs: Arc<[&'static str]>,
24    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
25        info!("Initializing PubSub client");
26
27        let key_file_path = gcp_config::credentials_path_from_env().map_err(|e| {
28            let err: Box<dyn std::error::Error + Send + Sync> = e.into();
29            err
30        })?;
31
32        let project_id = gcp_config::resolve_project_id(project_id)
33            .await
34            .map_err(|e| {
35                let err: Box<dyn std::error::Error + Send + Sync> = e.into();
36                err
37            })?;
38
39        info!("Using project_id: '{}'", project_id);
40
41        // Expand topic names into full topic paths
42        let expanded_topics: Vec<(String, &str)> = topics
43            .iter()
44            .map(|name| {
45                (
46                    format!("projects/{}/topics/{}-{}", project_id, name, instance_id),
47                    *name,
48                )
49            })
50            .collect();
51
52        // Expand subscription names into full subscription paths
53        let expanded_subs: Vec<(String, &str)> = subs
54            .iter()
55            .map(|name| {
56                (
57                    format!("projects/{}/subscriptions/{}", project_id, name),
58                    *name,
59                )
60            })
61            .collect();
62
63        let credentials = CredentialsFile::new_from_file(key_file_path).await?;
64        let config = ClientConfig::default()
65            .with_credentials(credentials)
66            .await?;
67        let client = Client::new(config).await?;
68
69        /* ---------- Publishers (build → freeze) ---------- */
70
71        let mut publishers_vec = Vec::with_capacity(expanded_topics.len());
72
73        for (topic_path, name) in expanded_topics.iter() {
74            let publisher = client.topic(topic_path).new_publisher(None);
75            publishers_vec.push((name.to_string(), publisher));
76            debug!("Created publisher '{}'", name);
77        }
78
79        let publishers: Arc<[(String, Publisher)]> = Arc::from(publishers_vec);
80
81        /* ---------- Subscriptions (build → freeze) ---------- */
82
83        let mut subscriptions_vec = Vec::with_capacity(expanded_subs.len());
84
85        for (sub_path, name) in expanded_subs.iter() {
86            let sub_config = SubscriptionConfig {
87                push_config: None,
88                ack_deadline_seconds: 10,
89                retain_acked_messages: false,
90                message_retention_duration: None,
91                labels: Default::default(),
92                enable_message_ordering: true,
93                expiration_policy: None,
94                filter: String::new(),
95                dead_letter_policy: None,
96                retry_policy: None,
97                detached: false,
98                topic_message_retention_duration: None,
99                enable_exactly_once_delivery: false,
100                bigquery_config: None,
101                state: 0,
102                cloud_storage_config: None,
103            };
104
105            let subscription = match client
106                .create_subscription(sub_path, "", sub_config, None)
107                .await
108            {
109                Ok(sub) => sub,
110                Err(err) => {
111                    error!(
112                        "Failed to create subscription '{}': {:?}. Falling back.",
113                        name, err
114                    );
115                    client.subscription(sub_path)
116                }
117            };
118
119            subscriptions_vec.push((name.to_string(), subscription));
120            debug!("Created subscription '{}'", name);
121        }
122
123        let subscriptions: Arc<[(String, Subscription)]> = Arc::from(subscriptions_vec);
124
125        info!("PubSub client initialized successfully");
126
127        Ok(Self {
128            publishers,
129            subscriptions,
130        })
131    }
132
133    /* ---------- Lookups ---------- */
134
135    pub fn get_publisher(&self, name: &str) -> Option<Publisher> {
136        self.publishers
137            .iter()
138            .find(|(n, _)| n == name)
139            .map(|(_, p)| p.clone())
140    }
141
142    pub fn get_subscription(&self, name: &str) -> Option<Subscription> {
143        self.subscriptions
144            .iter()
145            .find(|(n, _)| n == name)
146            .map(|(_, s)| s.clone())
147    }
148
149    /* ---------- Message helpers ---------- */
150
151    pub fn create_message<T: Serialize>(
152        &self,
153        payload: T,
154        ordering_key: Option<String>,
155    ) -> Result<PubsubMessage, serde_json::Error> {
156        let data = serde_json::to_vec(&payload)?;
157
158        Ok(PubsubMessage {
159            data,
160            attributes: Default::default(),
161            ordering_key: ordering_key.unwrap_or_default(),
162            message_id: String::new(),
163            publish_time: None,
164        })
165    }
166
167    pub async fn publish_fire_and_forget<T: Serialize + Send + 'static>(
168        &self,
169        topic: &str,
170        payload: T,
171        ordering_key: Option<String>,
172    ) {
173        let publisher = self.get_publisher(topic);
174        let topic_name = topic.to_string();
175
176        tokio::spawn(async move {
177            match publisher {
178                Some(publisher) => match serde_json::to_vec(&payload) {
179                    Ok(data) => {
180                        let message = PubsubMessage {
181                            data,
182                            attributes: Default::default(),
183                            ordering_key: ordering_key.unwrap_or_default(),
184                            message_id: String::new(),
185                            publish_time: None,
186                        };
187                        publisher.publish(message).await;
188                        debug!("Message published to '{}'", topic_name);
189                    }
190                    Err(e) => error!("Failed to serialize payload: {:?}", e),
191                },
192                None => error!("Publisher '{}' not found", topic_name),
193            }
194        });
195    }
196}
197
198pub async fn create_pubsub_client(
199    project_id: Option<String>,
200    instance_id: &str,
201    topics: Arc<[&'static str]>,
202    subs: Arc<[&'static str]>,
203) -> Result<PubSubsStuff, Box<dyn std::error::Error + Send + Sync>> {
204    PubSubsStuff::new(project_id, instance_id, topics, subs).await
205}