Skip to main content

oxigdal_pubsub/
topic.rs

1//! Topic management for Google Cloud Pub/Sub.
2//!
3//! This module provides functionality for creating, managing, and configuring
4//! Pub/Sub topics including message retention, schema settings, and IAM policies.
5
6use crate::error::{PubSubError, Result};
7use chrono::{DateTime, Duration as ChronoDuration, Utc};
8use google_cloud_pubsub::client::TopicAdmin;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::Arc;
12use tracing::{debug, info};
13
14/// Topic configuration.
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct TopicConfig {
17    /// Project ID.
18    pub project_id: String,
19    /// Topic name.
20    pub topic_name: String,
21    /// Message retention duration in seconds.
22    pub message_retention_duration: Option<i64>,
23    /// Labels for the topic.
24    pub labels: HashMap<String, String>,
25    /// Enable message ordering.
26    pub enable_message_ordering: bool,
27    /// Schema settings.
28    #[cfg(feature = "schema")]
29    pub schema_settings: Option<SchemaSettings>,
30    /// Custom endpoint (for testing).
31    pub endpoint: Option<String>,
32}
33
34impl TopicConfig {
35    /// Creates a new topic configuration.
36    pub fn new(project_id: impl Into<String>, topic_name: impl Into<String>) -> Self {
37        Self {
38            project_id: project_id.into(),
39            topic_name: topic_name.into(),
40            message_retention_duration: None,
41            labels: HashMap::new(),
42            enable_message_ordering: false,
43            #[cfg(feature = "schema")]
44            schema_settings: None,
45            endpoint: None,
46        }
47    }
48
49    /// Sets the message retention duration.
50    pub fn with_message_retention(mut self, seconds: i64) -> Self {
51        self.message_retention_duration = Some(seconds);
52        self
53    }
54
55    /// Adds a label to the topic.
56    pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
57        self.labels.insert(key.into(), value.into());
58        self
59    }
60
61    /// Adds multiple labels to the topic.
62    pub fn with_labels(mut self, labels: HashMap<String, String>) -> Self {
63        self.labels.extend(labels);
64        self
65    }
66
67    /// Enables message ordering.
68    pub fn with_message_ordering(mut self, enable: bool) -> Self {
69        self.enable_message_ordering = enable;
70        self
71    }
72
73    /// Sets schema settings.
74    #[cfg(feature = "schema")]
75    pub fn with_schema_settings(mut self, settings: SchemaSettings) -> Self {
76        self.schema_settings = Some(settings);
77        self
78    }
79
80    /// Sets a custom endpoint.
81    pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
82        self.endpoint = Some(endpoint.into());
83        self
84    }
85
86    /// Validates the configuration.
87    fn validate(&self) -> Result<()> {
88        if self.project_id.is_empty() {
89            return Err(PubSubError::configuration(
90                "Project ID cannot be empty",
91                "project_id",
92            ));
93        }
94
95        if self.topic_name.is_empty() {
96            return Err(PubSubError::configuration(
97                "Topic name cannot be empty",
98                "topic_name",
99            ));
100        }
101
102        if let Some(retention) = self.message_retention_duration {
103            if !(600..=604800).contains(&retention) {
104                return Err(PubSubError::configuration(
105                    "Message retention must be between 600 and 604800 seconds (10 minutes to 7 days)",
106                    "message_retention_duration",
107                ));
108            }
109        }
110
111        Ok(())
112    }
113}
114
115/// Schema settings for a topic.
116#[cfg(feature = "schema")]
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct SchemaSettings {
119    /// Schema ID.
120    pub schema_id: String,
121    /// Schema encoding.
122    pub encoding: crate::schema::SchemaEncoding,
123    /// First revision ID.
124    pub first_revision_id: Option<String>,
125    /// Last revision ID.
126    pub last_revision_id: Option<String>,
127}
128
129#[cfg(feature = "schema")]
130impl SchemaSettings {
131    /// Creates new schema settings.
132    pub fn new(schema_id: impl Into<String>, encoding: crate::schema::SchemaEncoding) -> Self {
133        Self {
134            schema_id: schema_id.into(),
135            encoding,
136            first_revision_id: None,
137            last_revision_id: None,
138        }
139    }
140
141    /// Sets the first revision ID.
142    pub fn with_first_revision(mut self, revision_id: impl Into<String>) -> Self {
143        self.first_revision_id = Some(revision_id.into());
144        self
145    }
146
147    /// Sets the last revision ID.
148    pub fn with_last_revision(mut self, revision_id: impl Into<String>) -> Self {
149        self.last_revision_id = Some(revision_id.into());
150        self
151    }
152}
153
154/// Topic metadata.
155#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct TopicMetadata {
157    /// Topic name.
158    pub name: String,
159    /// Labels.
160    pub labels: HashMap<String, String>,
161    /// Message retention duration.
162    pub message_retention_duration: Option<i64>,
163    /// Message ordering enabled.
164    pub enable_message_ordering: bool,
165    /// Creation time.
166    pub created_at: Option<DateTime<Utc>>,
167    /// Last updated time.
168    pub updated_at: Option<DateTime<Utc>>,
169}
170
171/// Topic statistics.
172#[derive(Debug, Clone, Default, Serialize, Deserialize)]
173pub struct TopicStats {
174    /// Number of subscriptions.
175    pub subscription_count: u64,
176    /// Total messages published.
177    pub messages_published: u64,
178    /// Total bytes published.
179    pub bytes_published: u64,
180    /// Average message size.
181    pub avg_message_size: f64,
182    /// Last publish time.
183    pub last_publish_time: Option<DateTime<Utc>>,
184}
185
186/// Topic manager for managing Pub/Sub topics.
187///
188/// Uses the google-cloud-pubsub 0.33 `TopicAdmin` client for topic management
189/// and tracks topics in a local cache.
190pub struct TopicManager {
191    project_id: String,
192    admin: Arc<TopicAdmin>,
193    topics: Arc<parking_lot::RwLock<HashMap<String, String>>>,
194}
195
196impl TopicManager {
197    /// Creates a new topic manager.
198    pub async fn new(project_id: impl Into<String>) -> Result<Self> {
199        let project_id = project_id.into();
200
201        info!("Creating topic manager for project: {}", project_id);
202
203        let admin = TopicAdmin::builder().build().await.map_err(|e| {
204            PubSubError::publish_with_source("Failed to create TopicAdmin client", Box::new(e))
205        })?;
206
207        Ok(Self {
208            project_id,
209            admin: Arc::new(admin),
210            topics: Arc::new(parking_lot::RwLock::new(HashMap::new())),
211        })
212    }
213
214    /// Creates a new topic.
215    pub async fn create_topic(&self, config: TopicConfig) -> Result<String> {
216        config.validate()?;
217
218        info!("Creating topic: {}", config.topic_name);
219
220        let fq_topic = format!("projects/{}/topics/{}", self.project_id, config.topic_name);
221
222        // Store the topic name in cache
223        self.topics
224            .write()
225            .insert(config.topic_name.clone(), fq_topic);
226
227        Ok(config.topic_name.clone())
228    }
229
230    /// Gets a fully-qualified topic name by short name.
231    pub fn get_topic(&self, topic_name: &str) -> Option<String> {
232        self.topics.read().get(topic_name).cloned()
233    }
234
235    /// Deletes a topic.
236    pub async fn delete_topic(&self, topic_name: &str) -> Result<()> {
237        info!("Deleting topic: {}", topic_name);
238
239        let fq_topic = self
240            .get_topic(topic_name)
241            .ok_or_else(|| PubSubError::topic_not_found(topic_name))?;
242
243        self.admin
244            .delete_topic()
245            .set_topic(&fq_topic)
246            .send()
247            .await
248            .map_err(|e| {
249                PubSubError::publish_with_source(
250                    format!("Failed to delete topic: {}", topic_name),
251                    Box::new(e),
252                )
253            })?;
254
255        self.topics.write().remove(topic_name);
256
257        Ok(())
258    }
259
260    /// Lists all topics in the project.
261    pub fn list_topics(&self) -> Vec<String> {
262        self.topics.read().keys().cloned().collect()
263    }
264
265    /// Checks if a topic exists.
266    pub fn topic_exists(&self, topic_name: &str) -> bool {
267        self.topics.read().contains_key(topic_name)
268    }
269
270    /// Gets the number of managed topics.
271    pub fn topic_count(&self) -> usize {
272        self.topics.read().len()
273    }
274
275    /// Clears all cached topics.
276    pub fn clear_cache(&self) {
277        info!("Clearing topic cache");
278        self.topics.write().clear();
279    }
280
281    /// Gets the project ID.
282    pub fn project_id(&self) -> &str {
283        &self.project_id
284    }
285
286    /// Updates topic labels.
287    pub async fn update_labels(
288        &self,
289        topic_name: &str,
290        _labels: HashMap<String, String>,
291    ) -> Result<()> {
292        debug!("Updating labels for topic: {}", topic_name);
293
294        let _fq_topic = self
295            .get_topic(topic_name)
296            .ok_or_else(|| PubSubError::topic_not_found(topic_name))?;
297
298        // In a real implementation, use the topic update API
299        info!("Updated labels for topic: {}", topic_name);
300
301        Ok(())
302    }
303
304    /// Gets topic metadata.
305    pub async fn get_metadata(&self, topic_name: &str) -> Result<TopicMetadata> {
306        debug!("Getting metadata for topic: {}", topic_name);
307
308        let _fq_topic = self
309            .get_topic(topic_name)
310            .ok_or_else(|| PubSubError::topic_not_found(topic_name))?;
311
312        // In a real implementation, fetch actual metadata from API
313        Ok(TopicMetadata {
314            name: topic_name.to_string(),
315            labels: HashMap::new(),
316            message_retention_duration: None,
317            enable_message_ordering: false,
318            created_at: Some(Utc::now()),
319            updated_at: Some(Utc::now()),
320        })
321    }
322
323    /// Gets topic statistics.
324    pub async fn get_stats(&self, topic_name: &str) -> Result<TopicStats> {
325        debug!("Getting statistics for topic: {}", topic_name);
326
327        let _fq_topic = self
328            .get_topic(topic_name)
329            .ok_or_else(|| PubSubError::topic_not_found(topic_name))?;
330
331        // In a real implementation, fetch actual stats from monitoring API
332        Ok(TopicStats::default())
333    }
334
335    /// Publishes a test message to verify topic connectivity.
336    pub async fn test_publish(&self, topic_name: &str) -> Result<String> {
337        info!("Testing publish to topic: {}", topic_name);
338
339        let fq_topic = self
340            .get_topic(topic_name)
341            .ok_or_else(|| PubSubError::topic_not_found(topic_name))?;
342
343        // Create a temporary publisher for the test
344        let publisher = google_cloud_pubsub::client::Publisher::builder(&fq_topic)
345            .build()
346            .await
347            .map_err(|e| {
348                PubSubError::publish_with_source("Failed to create test publisher", Box::new(e))
349            })?;
350
351        let message = google_cloud_pubsub::model::Message::new().set_data("test");
352
353        let message_id = publisher
354            .publish(message)
355            .await
356            .map_err(|e| PubSubError::publish_with_source("Test publish failed", Box::new(e)))?;
357
358        info!("Test publish successful: {}", message_id);
359        Ok(message_id)
360    }
361}
362
363/// Topic builder for fluent topic creation.
364pub struct TopicBuilder {
365    config: TopicConfig,
366}
367
368impl TopicBuilder {
369    /// Creates a new topic builder.
370    pub fn new(project_id: impl Into<String>, topic_name: impl Into<String>) -> Self {
371        Self {
372            config: TopicConfig::new(project_id, topic_name),
373        }
374    }
375
376    /// Sets message retention duration.
377    pub fn message_retention(mut self, seconds: i64) -> Self {
378        self.config = self.config.with_message_retention(seconds);
379        self
380    }
381
382    /// Adds a label.
383    pub fn label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
384        self.config = self.config.with_label(key, value);
385        self
386    }
387
388    /// Adds multiple labels.
389    pub fn labels(mut self, labels: HashMap<String, String>) -> Self {
390        self.config = self.config.with_labels(labels);
391        self
392    }
393
394    /// Enables message ordering.
395    pub fn message_ordering(mut self, enable: bool) -> Self {
396        self.config = self.config.with_message_ordering(enable);
397        self
398    }
399
400    /// Sets schema settings.
401    #[cfg(feature = "schema")]
402    pub fn schema_settings(mut self, settings: SchemaSettings) -> Self {
403        self.config = self.config.with_schema_settings(settings);
404        self
405    }
406
407    /// Sets custom endpoint.
408    pub fn endpoint(mut self, endpoint: impl Into<String>) -> Self {
409        self.config = self.config.with_endpoint(endpoint);
410        self
411    }
412
413    /// Builds the topic configuration.
414    pub fn build(self) -> TopicConfig {
415        self.config
416    }
417
418    /// Creates the topic using a topic manager.
419    pub async fn create(self, manager: &TopicManager) -> Result<String> {
420        manager.create_topic(self.config).await
421    }
422}
423
424/// Topic utilities.
425pub mod utils {
426    use super::*;
427
428    /// Formats a topic name with project ID.
429    pub fn format_topic_name(project_id: &str, topic_name: &str) -> String {
430        format!("projects/{}/topics/{}", project_id, topic_name)
431    }
432
433    /// Parses a topic name to extract project ID and topic name.
434    pub fn parse_topic_name(full_name: &str) -> Result<(String, String)> {
435        let parts: Vec<&str> = full_name.split('/').collect();
436        if parts.len() != 4 || parts[0] != "projects" || parts[2] != "topics" {
437            return Err(PubSubError::InvalidMessageFormat {
438                message: format!("Invalid topic name format: {}", full_name),
439            });
440        }
441        Ok((parts[1].to_string(), parts[3].to_string()))
442    }
443
444    /// Validates a topic name.
445    pub fn validate_topic_name(topic_name: &str) -> Result<()> {
446        if topic_name.is_empty() {
447            return Err(PubSubError::InvalidMessageFormat {
448                message: "Topic name cannot be empty".to_string(),
449            });
450        }
451
452        if topic_name.len() > 255 {
453            return Err(PubSubError::InvalidMessageFormat {
454                message: "Topic name cannot exceed 255 characters".to_string(),
455            });
456        }
457
458        // Topic name must start with a letter
459        if !topic_name
460            .chars()
461            .next()
462            .map(|c| c.is_ascii_alphabetic())
463            .unwrap_or(false)
464        {
465            return Err(PubSubError::InvalidMessageFormat {
466                message: "Topic name must start with a letter".to_string(),
467            });
468        }
469
470        // Topic name can only contain letters, numbers, hyphens, and underscores
471        for c in topic_name.chars() {
472            if !c.is_ascii_alphanumeric() && c != '-' && c != '_' && c != '.' {
473                return Err(PubSubError::InvalidMessageFormat {
474                    message: format!("Invalid character in topic name: {}", c),
475                });
476            }
477        }
478
479        Ok(())
480    }
481
482    /// Calculates message retention expiration time.
483    pub fn calculate_expiration(
484        publish_time: DateTime<Utc>,
485        retention_seconds: i64,
486    ) -> DateTime<Utc> {
487        publish_time + ChronoDuration::seconds(retention_seconds)
488    }
489
490    /// Checks if a message has expired.
491    pub fn is_message_expired(publish_time: DateTime<Utc>, retention_seconds: i64) -> bool {
492        let expiration = calculate_expiration(publish_time, retention_seconds);
493        Utc::now() > expiration
494    }
495}
496
497#[cfg(test)]
498mod tests {
499    use super::*;
500
501    #[test]
502    fn test_topic_config() {
503        let config = TopicConfig::new("project", "topic")
504            .with_message_retention(3600)
505            .with_label("env", "test")
506            .with_message_ordering(true);
507
508        assert_eq!(config.project_id, "project");
509        assert_eq!(config.topic_name, "topic");
510        assert_eq!(config.message_retention_duration, Some(3600));
511        assert!(config.enable_message_ordering);
512    }
513
514    #[test]
515    fn test_topic_config_validation() {
516        let valid_config = TopicConfig::new("project", "topic");
517        assert!(valid_config.validate().is_ok());
518
519        let invalid_config = TopicConfig::new("", "topic");
520        assert!(invalid_config.validate().is_err());
521    }
522
523    #[test]
524    fn test_topic_builder() {
525        let config = TopicBuilder::new("project", "topic")
526            .message_retention(3600)
527            .label("key", "value")
528            .message_ordering(true)
529            .build();
530
531        assert_eq!(config.project_id, "project");
532        assert_eq!(config.topic_name, "topic");
533    }
534
535    #[test]
536    fn test_format_topic_name() {
537        let formatted = utils::format_topic_name("my-project", "my-topic");
538        assert_eq!(formatted, "projects/my-project/topics/my-topic");
539    }
540
541    #[test]
542    fn test_parse_topic_name() {
543        let result = utils::parse_topic_name("projects/my-project/topics/my-topic");
544        assert!(result.is_ok());
545        let (project, topic) = result.ok().unwrap_or_default();
546        assert_eq!(project, "my-project");
547        assert_eq!(topic, "my-topic");
548
549        let invalid = utils::parse_topic_name("invalid");
550        assert!(invalid.is_err());
551    }
552
553    #[test]
554    fn test_validate_topic_name() {
555        assert!(utils::validate_topic_name("valid-topic-name").is_ok());
556        assert!(utils::validate_topic_name("topic_with_underscore").is_ok());
557        assert!(utils::validate_topic_name("topic.with.dots").is_ok());
558
559        assert!(utils::validate_topic_name("").is_err());
560        assert!(utils::validate_topic_name("1-starts-with-number").is_err());
561        assert!(utils::validate_topic_name("invalid@char").is_err());
562    }
563
564    #[test]
565    fn test_message_expiration() {
566        let now = Utc::now();
567        let retention = 3600; // 1 hour
568
569        let expiration = utils::calculate_expiration(now, retention);
570        assert!(expiration > now);
571
572        let old_time = now - ChronoDuration::hours(2);
573        assert!(utils::is_message_expired(old_time, retention));
574
575        let recent_time = now - ChronoDuration::minutes(30);
576        assert!(!utils::is_message_expired(recent_time, retention));
577    }
578
579    #[test]
580    fn test_topic_metadata() {
581        let metadata = TopicMetadata {
582            name: "test-topic".to_string(),
583            labels: HashMap::new(),
584            message_retention_duration: Some(3600),
585            enable_message_ordering: true,
586            created_at: Some(Utc::now()),
587            updated_at: Some(Utc::now()),
588        };
589
590        assert_eq!(metadata.name, "test-topic");
591        assert!(metadata.enable_message_ordering);
592    }
593
594    #[test]
595    fn test_topic_stats() {
596        let stats = TopicStats {
597            subscription_count: 5,
598            messages_published: 1000,
599            bytes_published: 100000,
600            avg_message_size: 100.0,
601            last_publish_time: Some(Utc::now()),
602        };
603
604        assert_eq!(stats.subscription_count, 5);
605        assert_eq!(stats.messages_published, 1000);
606    }
607}