1use crate::error::{PubSubError, Result};
7use chrono::{DateTime, Duration as ChronoDuration, Utc};
8use google_cloud_pubsub::client::TopicAdmin;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::Arc;
12use tracing::{debug, info};
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct TopicConfig {
17 pub project_id: String,
19 pub topic_name: String,
21 pub message_retention_duration: Option<i64>,
23 pub labels: HashMap<String, String>,
25 pub enable_message_ordering: bool,
27 #[cfg(feature = "schema")]
29 pub schema_settings: Option<SchemaSettings>,
30 pub endpoint: Option<String>,
32}
33
34impl TopicConfig {
35 pub fn new(project_id: impl Into<String>, topic_name: impl Into<String>) -> Self {
37 Self {
38 project_id: project_id.into(),
39 topic_name: topic_name.into(),
40 message_retention_duration: None,
41 labels: HashMap::new(),
42 enable_message_ordering: false,
43 #[cfg(feature = "schema")]
44 schema_settings: None,
45 endpoint: None,
46 }
47 }
48
49 pub fn with_message_retention(mut self, seconds: i64) -> Self {
51 self.message_retention_duration = Some(seconds);
52 self
53 }
54
55 pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
57 self.labels.insert(key.into(), value.into());
58 self
59 }
60
61 pub fn with_labels(mut self, labels: HashMap<String, String>) -> Self {
63 self.labels.extend(labels);
64 self
65 }
66
67 pub fn with_message_ordering(mut self, enable: bool) -> Self {
69 self.enable_message_ordering = enable;
70 self
71 }
72
73 #[cfg(feature = "schema")]
75 pub fn with_schema_settings(mut self, settings: SchemaSettings) -> Self {
76 self.schema_settings = Some(settings);
77 self
78 }
79
80 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
82 self.endpoint = Some(endpoint.into());
83 self
84 }
85
86 fn validate(&self) -> Result<()> {
88 if self.project_id.is_empty() {
89 return Err(PubSubError::configuration(
90 "Project ID cannot be empty",
91 "project_id",
92 ));
93 }
94
95 if self.topic_name.is_empty() {
96 return Err(PubSubError::configuration(
97 "Topic name cannot be empty",
98 "topic_name",
99 ));
100 }
101
102 if let Some(retention) = self.message_retention_duration {
103 if !(600..=604800).contains(&retention) {
104 return Err(PubSubError::configuration(
105 "Message retention must be between 600 and 604800 seconds (10 minutes to 7 days)",
106 "message_retention_duration",
107 ));
108 }
109 }
110
111 Ok(())
112 }
113}
114
115#[cfg(feature = "schema")]
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct SchemaSettings {
119 pub schema_id: String,
121 pub encoding: crate::schema::SchemaEncoding,
123 pub first_revision_id: Option<String>,
125 pub last_revision_id: Option<String>,
127}
128
129#[cfg(feature = "schema")]
130impl SchemaSettings {
131 pub fn new(schema_id: impl Into<String>, encoding: crate::schema::SchemaEncoding) -> Self {
133 Self {
134 schema_id: schema_id.into(),
135 encoding,
136 first_revision_id: None,
137 last_revision_id: None,
138 }
139 }
140
141 pub fn with_first_revision(mut self, revision_id: impl Into<String>) -> Self {
143 self.first_revision_id = Some(revision_id.into());
144 self
145 }
146
147 pub fn with_last_revision(mut self, revision_id: impl Into<String>) -> Self {
149 self.last_revision_id = Some(revision_id.into());
150 self
151 }
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct TopicMetadata {
157 pub name: String,
159 pub labels: HashMap<String, String>,
161 pub message_retention_duration: Option<i64>,
163 pub enable_message_ordering: bool,
165 pub created_at: Option<DateTime<Utc>>,
167 pub updated_at: Option<DateTime<Utc>>,
169}
170
171#[derive(Debug, Clone, Default, Serialize, Deserialize)]
173pub struct TopicStats {
174 pub subscription_count: u64,
176 pub messages_published: u64,
178 pub bytes_published: u64,
180 pub avg_message_size: f64,
182 pub last_publish_time: Option<DateTime<Utc>>,
184}
185
186pub struct TopicManager {
191 project_id: String,
192 admin: Arc<TopicAdmin>,
193 topics: Arc<parking_lot::RwLock<HashMap<String, String>>>,
194}
195
196impl TopicManager {
197 pub async fn new(project_id: impl Into<String>) -> Result<Self> {
199 let project_id = project_id.into();
200
201 info!("Creating topic manager for project: {}", project_id);
202
203 let admin = TopicAdmin::builder().build().await.map_err(|e| {
204 PubSubError::publish_with_source("Failed to create TopicAdmin client", Box::new(e))
205 })?;
206
207 Ok(Self {
208 project_id,
209 admin: Arc::new(admin),
210 topics: Arc::new(parking_lot::RwLock::new(HashMap::new())),
211 })
212 }
213
214 pub async fn create_topic(&self, config: TopicConfig) -> Result<String> {
216 config.validate()?;
217
218 info!("Creating topic: {}", config.topic_name);
219
220 let fq_topic = format!("projects/{}/topics/{}", self.project_id, config.topic_name);
221
222 self.topics
224 .write()
225 .insert(config.topic_name.clone(), fq_topic);
226
227 Ok(config.topic_name.clone())
228 }
229
230 pub fn get_topic(&self, topic_name: &str) -> Option<String> {
232 self.topics.read().get(topic_name).cloned()
233 }
234
235 pub async fn delete_topic(&self, topic_name: &str) -> Result<()> {
237 info!("Deleting topic: {}", topic_name);
238
239 let fq_topic = self
240 .get_topic(topic_name)
241 .ok_or_else(|| PubSubError::topic_not_found(topic_name))?;
242
243 self.admin
244 .delete_topic()
245 .set_topic(&fq_topic)
246 .send()
247 .await
248 .map_err(|e| {
249 PubSubError::publish_with_source(
250 format!("Failed to delete topic: {}", topic_name),
251 Box::new(e),
252 )
253 })?;
254
255 self.topics.write().remove(topic_name);
256
257 Ok(())
258 }
259
260 pub fn list_topics(&self) -> Vec<String> {
262 self.topics.read().keys().cloned().collect()
263 }
264
265 pub fn topic_exists(&self, topic_name: &str) -> bool {
267 self.topics.read().contains_key(topic_name)
268 }
269
270 pub fn topic_count(&self) -> usize {
272 self.topics.read().len()
273 }
274
275 pub fn clear_cache(&self) {
277 info!("Clearing topic cache");
278 self.topics.write().clear();
279 }
280
281 pub fn project_id(&self) -> &str {
283 &self.project_id
284 }
285
286 pub async fn update_labels(
288 &self,
289 topic_name: &str,
290 _labels: HashMap<String, String>,
291 ) -> Result<()> {
292 debug!("Updating labels for topic: {}", topic_name);
293
294 let _fq_topic = self
295 .get_topic(topic_name)
296 .ok_or_else(|| PubSubError::topic_not_found(topic_name))?;
297
298 info!("Updated labels for topic: {}", topic_name);
300
301 Ok(())
302 }
303
304 pub async fn get_metadata(&self, topic_name: &str) -> Result<TopicMetadata> {
306 debug!("Getting metadata for topic: {}", topic_name);
307
308 let _fq_topic = self
309 .get_topic(topic_name)
310 .ok_or_else(|| PubSubError::topic_not_found(topic_name))?;
311
312 Ok(TopicMetadata {
314 name: topic_name.to_string(),
315 labels: HashMap::new(),
316 message_retention_duration: None,
317 enable_message_ordering: false,
318 created_at: Some(Utc::now()),
319 updated_at: Some(Utc::now()),
320 })
321 }
322
323 pub async fn get_stats(&self, topic_name: &str) -> Result<TopicStats> {
325 debug!("Getting statistics for topic: {}", topic_name);
326
327 let _fq_topic = self
328 .get_topic(topic_name)
329 .ok_or_else(|| PubSubError::topic_not_found(topic_name))?;
330
331 Ok(TopicStats::default())
333 }
334
335 pub async fn test_publish(&self, topic_name: &str) -> Result<String> {
337 info!("Testing publish to topic: {}", topic_name);
338
339 let fq_topic = self
340 .get_topic(topic_name)
341 .ok_or_else(|| PubSubError::topic_not_found(topic_name))?;
342
343 let publisher = google_cloud_pubsub::client::Publisher::builder(&fq_topic)
345 .build()
346 .await
347 .map_err(|e| {
348 PubSubError::publish_with_source("Failed to create test publisher", Box::new(e))
349 })?;
350
351 let message = google_cloud_pubsub::model::Message::new().set_data("test");
352
353 let message_id = publisher
354 .publish(message)
355 .await
356 .map_err(|e| PubSubError::publish_with_source("Test publish failed", Box::new(e)))?;
357
358 info!("Test publish successful: {}", message_id);
359 Ok(message_id)
360 }
361}
362
363pub struct TopicBuilder {
365 config: TopicConfig,
366}
367
368impl TopicBuilder {
369 pub fn new(project_id: impl Into<String>, topic_name: impl Into<String>) -> Self {
371 Self {
372 config: TopicConfig::new(project_id, topic_name),
373 }
374 }
375
376 pub fn message_retention(mut self, seconds: i64) -> Self {
378 self.config = self.config.with_message_retention(seconds);
379 self
380 }
381
382 pub fn label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
384 self.config = self.config.with_label(key, value);
385 self
386 }
387
388 pub fn labels(mut self, labels: HashMap<String, String>) -> Self {
390 self.config = self.config.with_labels(labels);
391 self
392 }
393
394 pub fn message_ordering(mut self, enable: bool) -> Self {
396 self.config = self.config.with_message_ordering(enable);
397 self
398 }
399
400 #[cfg(feature = "schema")]
402 pub fn schema_settings(mut self, settings: SchemaSettings) -> Self {
403 self.config = self.config.with_schema_settings(settings);
404 self
405 }
406
407 pub fn endpoint(mut self, endpoint: impl Into<String>) -> Self {
409 self.config = self.config.with_endpoint(endpoint);
410 self
411 }
412
413 pub fn build(self) -> TopicConfig {
415 self.config
416 }
417
418 pub async fn create(self, manager: &TopicManager) -> Result<String> {
420 manager.create_topic(self.config).await
421 }
422}
423
424pub mod utils {
426 use super::*;
427
428 pub fn format_topic_name(project_id: &str, topic_name: &str) -> String {
430 format!("projects/{}/topics/{}", project_id, topic_name)
431 }
432
433 pub fn parse_topic_name(full_name: &str) -> Result<(String, String)> {
435 let parts: Vec<&str> = full_name.split('/').collect();
436 if parts.len() != 4 || parts[0] != "projects" || parts[2] != "topics" {
437 return Err(PubSubError::InvalidMessageFormat {
438 message: format!("Invalid topic name format: {}", full_name),
439 });
440 }
441 Ok((parts[1].to_string(), parts[3].to_string()))
442 }
443
444 pub fn validate_topic_name(topic_name: &str) -> Result<()> {
446 if topic_name.is_empty() {
447 return Err(PubSubError::InvalidMessageFormat {
448 message: "Topic name cannot be empty".to_string(),
449 });
450 }
451
452 if topic_name.len() > 255 {
453 return Err(PubSubError::InvalidMessageFormat {
454 message: "Topic name cannot exceed 255 characters".to_string(),
455 });
456 }
457
458 if !topic_name
460 .chars()
461 .next()
462 .map(|c| c.is_ascii_alphabetic())
463 .unwrap_or(false)
464 {
465 return Err(PubSubError::InvalidMessageFormat {
466 message: "Topic name must start with a letter".to_string(),
467 });
468 }
469
470 for c in topic_name.chars() {
472 if !c.is_ascii_alphanumeric() && c != '-' && c != '_' && c != '.' {
473 return Err(PubSubError::InvalidMessageFormat {
474 message: format!("Invalid character in topic name: {}", c),
475 });
476 }
477 }
478
479 Ok(())
480 }
481
482 pub fn calculate_expiration(
484 publish_time: DateTime<Utc>,
485 retention_seconds: i64,
486 ) -> DateTime<Utc> {
487 publish_time + ChronoDuration::seconds(retention_seconds)
488 }
489
490 pub fn is_message_expired(publish_time: DateTime<Utc>, retention_seconds: i64) -> bool {
492 let expiration = calculate_expiration(publish_time, retention_seconds);
493 Utc::now() > expiration
494 }
495}
496
497#[cfg(test)]
498mod tests {
499 use super::*;
500
501 #[test]
502 fn test_topic_config() {
503 let config = TopicConfig::new("project", "topic")
504 .with_message_retention(3600)
505 .with_label("env", "test")
506 .with_message_ordering(true);
507
508 assert_eq!(config.project_id, "project");
509 assert_eq!(config.topic_name, "topic");
510 assert_eq!(config.message_retention_duration, Some(3600));
511 assert!(config.enable_message_ordering);
512 }
513
514 #[test]
515 fn test_topic_config_validation() {
516 let valid_config = TopicConfig::new("project", "topic");
517 assert!(valid_config.validate().is_ok());
518
519 let invalid_config = TopicConfig::new("", "topic");
520 assert!(invalid_config.validate().is_err());
521 }
522
523 #[test]
524 fn test_topic_builder() {
525 let config = TopicBuilder::new("project", "topic")
526 .message_retention(3600)
527 .label("key", "value")
528 .message_ordering(true)
529 .build();
530
531 assert_eq!(config.project_id, "project");
532 assert_eq!(config.topic_name, "topic");
533 }
534
535 #[test]
536 fn test_format_topic_name() {
537 let formatted = utils::format_topic_name("my-project", "my-topic");
538 assert_eq!(formatted, "projects/my-project/topics/my-topic");
539 }
540
541 #[test]
542 fn test_parse_topic_name() {
543 let result = utils::parse_topic_name("projects/my-project/topics/my-topic");
544 assert!(result.is_ok());
545 let (project, topic) = result.ok().unwrap_or_default();
546 assert_eq!(project, "my-project");
547 assert_eq!(topic, "my-topic");
548
549 let invalid = utils::parse_topic_name("invalid");
550 assert!(invalid.is_err());
551 }
552
553 #[test]
554 fn test_validate_topic_name() {
555 assert!(utils::validate_topic_name("valid-topic-name").is_ok());
556 assert!(utils::validate_topic_name("topic_with_underscore").is_ok());
557 assert!(utils::validate_topic_name("topic.with.dots").is_ok());
558
559 assert!(utils::validate_topic_name("").is_err());
560 assert!(utils::validate_topic_name("1-starts-with-number").is_err());
561 assert!(utils::validate_topic_name("invalid@char").is_err());
562 }
563
564 #[test]
565 fn test_message_expiration() {
566 let now = Utc::now();
567 let retention = 3600; let expiration = utils::calculate_expiration(now, retention);
570 assert!(expiration > now);
571
572 let old_time = now - ChronoDuration::hours(2);
573 assert!(utils::is_message_expired(old_time, retention));
574
575 let recent_time = now - ChronoDuration::minutes(30);
576 assert!(!utils::is_message_expired(recent_time, retention));
577 }
578
579 #[test]
580 fn test_topic_metadata() {
581 let metadata = TopicMetadata {
582 name: "test-topic".to_string(),
583 labels: HashMap::new(),
584 message_retention_duration: Some(3600),
585 enable_message_ordering: true,
586 created_at: Some(Utc::now()),
587 updated_at: Some(Utc::now()),
588 };
589
590 assert_eq!(metadata.name, "test-topic");
591 assert!(metadata.enable_message_ordering);
592 }
593
594 #[test]
595 fn test_topic_stats() {
596 let stats = TopicStats {
597 subscription_count: 5,
598 messages_published: 1000,
599 bytes_published: 100000,
600 avg_message_size: 100.0,
601 last_publish_time: Some(Utc::now()),
602 };
603
604 assert_eq!(stats.subscription_count, 5);
605 assert_eq!(stats.messages_published, 1000);
606 }
607}