pulsar_admin/
persistent_topics.rs

1use std::collections::HashMap;
2use std::error::Error;
3
4use serde::{Deserialize, Serialize};
5
6use crate::inner_http_client::InnerHttpClient;
7use crate::url_constants::URL_PERSISTENT;
8
9pub struct PersistentTopics<'a> {
10    inner_http_client: &'a InnerHttpClient,
11}
12
13#[derive(Serialize, Deserialize, Debug)]
14pub struct TopicStats {
15    #[serde(rename = "msgRateIn")]
16    msg_rate_in: f64,
17    #[serde(rename = "msgThroughputIn")]
18    msg_throughput_in: f64,
19    #[serde(rename = "msgRateOut")]
20    msg_rate_out: f64,
21    #[serde(rename = "msgThroughputOut")]
22    msg_throughput_out: f64,
23    #[serde(rename = "bytesInCounter")]
24    bytes_in_counter: u64,
25    #[serde(rename = "msgInCounter")]
26    msg_in_counter: u64,
27    #[serde(rename = "bytesOutCounter")]
28    bytes_out_counter: u64,
29    #[serde(rename = "msgOutCounter")]
30    msg_out_counter: u64,
31    #[serde(rename = "averageMsgSize")]
32    average_msg_size: f64,
33    #[serde(rename = "msgChunkPublished")]
34    msg_chunk_published: bool,
35    #[serde(rename = "storageSize")]
36    storage_size: u64,
37    #[serde(rename = "backlogSize")]
38    backlog_size: u64,
39    #[serde(rename = "publishRateLimitedTimes")]
40    publish_rate_limited_times: u64,
41    #[serde(rename = "earliestMsgPublishTimeInBacklogs")]
42    earliest_msg_publish_time_in_backlogs: u64,
43    #[serde(rename = "offloadedStorageSize")]
44    offloaded_storage_size: u64,
45    #[serde(rename = "lastOffloadLedgerId")]
46    last_offload_ledger_id: u64,
47    #[serde(rename = "lastOffloadSuccessTimeStamp")]
48    last_offload_success_time_stamp: u64,
49    #[serde(rename = "lastOffloadFailureTimeStamp")]
50    last_offload_failure_time_stamp: u64,
51    #[serde(rename = "ongoingTxnCount")]
52    ongoing_txn_count: Option<u64>,
53    #[serde(rename = "abortedTxnCount")]
54    aborted_txn_count: Option<u64>,
55    #[serde(rename = "committedTxnCount")]
56    committed_txn_count: Option<u64>,
57    #[serde(rename = "publishers")]
58    publishers: Vec<String>,
59    #[serde(rename = "waitingPublishers")]
60    waiting_publishers: u64,
61    #[serde(rename = "subscriptions")]
62    subscriptions: HashMap<String, String>,
63    #[serde(rename = "replication")]
64    replication: HashMap<String, String>,
65    #[serde(rename = "deduplicationStatus")]
66    deduplication_status: String,
67    #[serde(rename = "nonContiguousDeletedMessagesRanges")]
68    non_contiguous_deleted_messages_ranges: u64,
69    #[serde(rename = "nonContiguousDeletedMessagesRangesSerializedSize")]
70    non_contiguous_deleted_messages_ranges_serialized_size: u64,
71    #[serde(rename = "delayedMessageIndexSizeInBytes")]
72    delayed_message_index_size_in_bytes: Option<u64>,
73    #[serde(rename = "compaction")]
74    compaction: Compaction,
75    #[serde(rename = "ownerBroker")]
76    owner_broker: Option<String>,
77}
78
79#[derive(Serialize, Deserialize, Debug)]
80pub struct Compaction {
81    #[serde(rename = "lastCompactionRemovedEventCount")]
82    last_compaction_removed_event_count: u64,
83    #[serde(rename = "lastCompactionSucceedTimestamp")]
84    last_compaction_succeed_timestamp: u64,
85    #[serde(rename = "lastCompactionFailedTimestamp")]
86    last_compaction_failed_timestamp: u64,
87    #[serde(rename = "lastCompactionDurationTimeInMills")]
88    last_compaction_duration_time_in_mills: u64,
89}
90
91impl<'a> PersistentTopics<'a> {
92    pub fn new(inner_http_client: &'a InnerHttpClient) -> Self {
93        PersistentTopics { inner_http_client }
94    }
95
96    pub async fn create_non_partitioned_topic(
97        &self,
98        tenant: &str,
99        namespace: &str,
100        topic: &str,
101    ) -> Result<(), Box<dyn Error>> {
102        let url_path = format!("{}/{}/{}/{}", URL_PERSISTENT, tenant, namespace, topic);
103        self.inner_http_client.put(url_path.as_str(), "".to_string()).await
104    }
105
106    pub async fn delete_non_partitioned_topic(
107        &self,
108        tenant: &str,
109        namespace: &str,
110        topic: &str,
111    ) -> Result<(), Box<dyn Error>> {
112        let url_path = format!("{}/{}/{}/{}", URL_PERSISTENT, tenant, namespace, topic);
113        self.inner_http_client.delete(url_path.as_str()).await
114    }
115
116    pub async fn list_non_partitioned_topic(
117        &self,
118        tenant: &str,
119        namespace: &str,
120    ) -> Result<Vec<String>, Box<dyn Error>> {
121        let url_path = format!("{}/{}/{}", URL_PERSISTENT, tenant, namespace);
122        let response = self.inner_http_client.get(url_path.as_str()).await?;
123        let topics: Vec<String> = serde_json::from_str(&response)?;
124        Ok(topics)
125    }
126
127    pub async fn create_partitioned_topic(
128        &self,
129        tenant: &str,
130        namespace: &str,
131        topic: &str,
132        num_partitions: i32,
133    ) -> Result<(), Box<dyn Error>> {
134        let url_path = format!("{}/{}/{}/{}/partitions", URL_PERSISTENT, tenant, namespace, topic);
135        self.inner_http_client.put(url_path.as_str(), num_partitions.to_string()).await
136    }
137
138    pub async fn delete_partitioned_topic(
139        &self,
140        tenant: &str,
141        namespace: &str,
142        topic: &str,
143    ) -> Result<(), Box<dyn Error>> {
144        let url_path = format!("{}/{}/{}/{}/partitions", URL_PERSISTENT, tenant, namespace, topic);
145        self.inner_http_client.delete(url_path.as_str()).await
146    }
147
148    pub async fn list_partitioned_topic(
149        &self,
150        tenant: &str,
151        namespace: &str,
152    ) -> Result<Vec<String>, Box<dyn Error>> {
153        let url_path = format!("{}/{}/{}/partitioned", URL_PERSISTENT, tenant, namespace);
154        let response = self.inner_http_client.get(url_path.as_str()).await?;
155        let topics: Vec<String> = serde_json::from_str(&response)?;
156        Ok(topics)
157    }
158
159    pub async fn topic_stats(
160        &self,
161        tenant: &str,
162        namespace: &str,
163        topic: &str,
164    ) -> Result<TopicStats, Box<dyn Error>> {
165        let url_path = format!("{}/{}/{}/{}/stats", URL_PERSISTENT, tenant, namespace, topic);
166        let response =self.inner_http_client.get(url_path.as_str()).await?;
167        let topic_stats: TopicStats = serde_json::from_str(response.as_str())?;
168        Ok(topic_stats)
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use crate::{PulsarAdmin, util};
175
176    const PULSAR_HOST: &str = "127.0.0.1";
177    const PULSAR_PORT: u16 = 8080;
178
179    #[tokio::test]
180    async fn test_non_partitioned_topic() {
181        let pulsar_admin = PulsarAdmin::new(PULSAR_HOST, PULSAR_PORT, None);
182        let namespaces = pulsar_admin.namespaces();
183        let persistent_topics = pulsar_admin.persistent_topics();
184        let tenant = "public";
185        let namespace = util::rand_str(8);
186        println!("test_partitioned_topic namespace: {:?}", namespace);
187        let result = namespaces.create_namespace(tenant, namespace.as_str()).await;
188        assert!(result.is_ok());
189        let topic = "test_partitioned_topic";
190        let result = persistent_topics.create_non_partitioned_topic(tenant, namespace.as_str(), topic).await;
191        assert!(result.is_ok());
192        let result = persistent_topics.list_non_partitioned_topic(tenant, namespace.as_str()).await;
193        assert!(result.is_ok());
194        let topics = result.unwrap();
195        assert_eq!(topics.len(), 1);
196        assert_eq!(topics[0], format!("persistent://{}/{}/{}", tenant, namespace, topic));
197        let result = persistent_topics.delete_non_partitioned_topic(tenant, namespace.as_str(), topic).await;
198        assert!(result.is_ok());
199        let result = namespaces.delete_namespace(tenant, namespace.as_str()).await;
200        assert!(result.is_ok());
201    }
202
203    #[tokio::test]
204    async fn test_non_partitioned_topic_with_topic_stats() {
205        let pulsar_admin = PulsarAdmin::new(PULSAR_HOST, PULSAR_PORT, None);
206        let namespaces = pulsar_admin.namespaces();
207        let persistent_topics = pulsar_admin.persistent_topics();
208        let tenant = "public";
209        let namespace = util::rand_str(8);
210        println!("test_partitioned_topic namespace: {:?}", namespace);
211        let result = namespaces.create_namespace(tenant, namespace.as_str()).await;
212        assert!(result.is_ok());
213        let topic = "test_partitioned_topic";
214        let result = persistent_topics.create_non_partitioned_topic(tenant, namespace.as_str(), topic).await;
215        assert!(result.is_ok());
216        let result = persistent_topics.list_non_partitioned_topic(tenant, namespace.as_str()).await;
217        assert!(result.is_ok());
218        let topics = result.unwrap();
219        assert_eq!(topics.len(), 1);
220        assert_eq!(topics[0], format!("persistent://{}/{}/{}", tenant, namespace, topic));
221        let result = persistent_topics.topic_stats(tenant, namespace.as_str(), topic).await;
222        println!("topic_stats: {:?}", result);
223        assert!(result.is_ok());
224        let result = persistent_topics.delete_non_partitioned_topic(tenant, namespace.as_str(), topic).await;
225        assert!(result.is_ok());
226        let result = namespaces.delete_namespace(tenant, namespace.as_str()).await;
227        assert!(result.is_ok());
228    }
229
230    #[tokio::test]
231    async fn test_partitioned_topic() {
232        let pulsar_admin = PulsarAdmin::new(PULSAR_HOST, PULSAR_PORT, None);
233        let namespaces = pulsar_admin.namespaces();
234        let persistent_topics = pulsar_admin.persistent_topics();
235        let tenant = "public";
236        let namespace = util::rand_str(8);
237        println!("test_partitioned_topic namespace: {:?}", namespace);
238        let result = namespaces.create_namespace(tenant, namespace.as_str()).await;
239        assert!(result.is_ok());
240        let topic = "test_partitioned_topic";
241        let num_partitions = 3;
242        let result = persistent_topics.create_partitioned_topic(tenant, namespace.as_str(), topic, num_partitions).await;
243        assert!(result.is_ok());
244        let result = persistent_topics.list_partitioned_topic(tenant, namespace.as_str()).await;
245        assert!(result.is_ok());
246        let topics = result.unwrap();
247        assert_eq!(topics.len(), 1);
248        assert_eq!(topics[0], format!("persistent://{}/{}/{}", tenant, namespace, topic));
249        let result = persistent_topics.delete_partitioned_topic(tenant, namespace.as_str(), topic).await;
250        assert!(result.is_ok());
251        let result = namespaces.delete_namespace(tenant, namespace.as_str()).await;
252        assert!(result.is_ok());
253    }
254}