1use std::collections::HashMap;
2use std::error::Error;
3
4use serde::{Deserialize, Serialize};
5
6use crate::inner_http_client::InnerHttpClient;
7use crate::url_constants::URL_PERSISTENT;
8
9pub struct PersistentTopics<'a> {
10 inner_http_client: &'a InnerHttpClient,
11}
12
13#[derive(Serialize, Deserialize, Debug)]
14pub struct TopicStats {
15 #[serde(rename = "msgRateIn")]
16 msg_rate_in: f64,
17 #[serde(rename = "msgThroughputIn")]
18 msg_throughput_in: f64,
19 #[serde(rename = "msgRateOut")]
20 msg_rate_out: f64,
21 #[serde(rename = "msgThroughputOut")]
22 msg_throughput_out: f64,
23 #[serde(rename = "bytesInCounter")]
24 bytes_in_counter: u64,
25 #[serde(rename = "msgInCounter")]
26 msg_in_counter: u64,
27 #[serde(rename = "bytesOutCounter")]
28 bytes_out_counter: u64,
29 #[serde(rename = "msgOutCounter")]
30 msg_out_counter: u64,
31 #[serde(rename = "averageMsgSize")]
32 average_msg_size: f64,
33 #[serde(rename = "msgChunkPublished")]
34 msg_chunk_published: bool,
35 #[serde(rename = "storageSize")]
36 storage_size: u64,
37 #[serde(rename = "backlogSize")]
38 backlog_size: u64,
39 #[serde(rename = "publishRateLimitedTimes")]
40 publish_rate_limited_times: u64,
41 #[serde(rename = "earliestMsgPublishTimeInBacklogs")]
42 earliest_msg_publish_time_in_backlogs: u64,
43 #[serde(rename = "offloadedStorageSize")]
44 offloaded_storage_size: u64,
45 #[serde(rename = "lastOffloadLedgerId")]
46 last_offload_ledger_id: u64,
47 #[serde(rename = "lastOffloadSuccessTimeStamp")]
48 last_offload_success_time_stamp: u64,
49 #[serde(rename = "lastOffloadFailureTimeStamp")]
50 last_offload_failure_time_stamp: u64,
51 #[serde(rename = "ongoingTxnCount")]
52 ongoing_txn_count: Option<u64>,
53 #[serde(rename = "abortedTxnCount")]
54 aborted_txn_count: Option<u64>,
55 #[serde(rename = "committedTxnCount")]
56 committed_txn_count: Option<u64>,
57 #[serde(rename = "publishers")]
58 publishers: Vec<String>,
59 #[serde(rename = "waitingPublishers")]
60 waiting_publishers: u64,
61 #[serde(rename = "subscriptions")]
62 subscriptions: HashMap<String, String>,
63 #[serde(rename = "replication")]
64 replication: HashMap<String, String>,
65 #[serde(rename = "deduplicationStatus")]
66 deduplication_status: String,
67 #[serde(rename = "nonContiguousDeletedMessagesRanges")]
68 non_contiguous_deleted_messages_ranges: u64,
69 #[serde(rename = "nonContiguousDeletedMessagesRangesSerializedSize")]
70 non_contiguous_deleted_messages_ranges_serialized_size: u64,
71 #[serde(rename = "delayedMessageIndexSizeInBytes")]
72 delayed_message_index_size_in_bytes: Option<u64>,
73 #[serde(rename = "compaction")]
74 compaction: Compaction,
75 #[serde(rename = "ownerBroker")]
76 owner_broker: Option<String>,
77}
78
79#[derive(Serialize, Deserialize, Debug)]
80pub struct Compaction {
81 #[serde(rename = "lastCompactionRemovedEventCount")]
82 last_compaction_removed_event_count: u64,
83 #[serde(rename = "lastCompactionSucceedTimestamp")]
84 last_compaction_succeed_timestamp: u64,
85 #[serde(rename = "lastCompactionFailedTimestamp")]
86 last_compaction_failed_timestamp: u64,
87 #[serde(rename = "lastCompactionDurationTimeInMills")]
88 last_compaction_duration_time_in_mills: u64,
89}
90
91impl<'a> PersistentTopics<'a> {
92 pub fn new(inner_http_client: &'a InnerHttpClient) -> Self {
93 PersistentTopics { inner_http_client }
94 }
95
96 pub async fn create_non_partitioned_topic(
97 &self,
98 tenant: &str,
99 namespace: &str,
100 topic: &str,
101 ) -> Result<(), Box<dyn Error>> {
102 let url_path = format!("{}/{}/{}/{}", URL_PERSISTENT, tenant, namespace, topic);
103 self.inner_http_client.put(url_path.as_str(), "".to_string()).await
104 }
105
106 pub async fn delete_non_partitioned_topic(
107 &self,
108 tenant: &str,
109 namespace: &str,
110 topic: &str,
111 ) -> Result<(), Box<dyn Error>> {
112 let url_path = format!("{}/{}/{}/{}", URL_PERSISTENT, tenant, namespace, topic);
113 self.inner_http_client.delete(url_path.as_str()).await
114 }
115
116 pub async fn list_non_partitioned_topic(
117 &self,
118 tenant: &str,
119 namespace: &str,
120 ) -> Result<Vec<String>, Box<dyn Error>> {
121 let url_path = format!("{}/{}/{}", URL_PERSISTENT, tenant, namespace);
122 let response = self.inner_http_client.get(url_path.as_str()).await?;
123 let topics: Vec<String> = serde_json::from_str(&response)?;
124 Ok(topics)
125 }
126
127 pub async fn create_partitioned_topic(
128 &self,
129 tenant: &str,
130 namespace: &str,
131 topic: &str,
132 num_partitions: i32,
133 ) -> Result<(), Box<dyn Error>> {
134 let url_path = format!("{}/{}/{}/{}/partitions", URL_PERSISTENT, tenant, namespace, topic);
135 self.inner_http_client.put(url_path.as_str(), num_partitions.to_string()).await
136 }
137
138 pub async fn delete_partitioned_topic(
139 &self,
140 tenant: &str,
141 namespace: &str,
142 topic: &str,
143 ) -> Result<(), Box<dyn Error>> {
144 let url_path = format!("{}/{}/{}/{}/partitions", URL_PERSISTENT, tenant, namespace, topic);
145 self.inner_http_client.delete(url_path.as_str()).await
146 }
147
148 pub async fn list_partitioned_topic(
149 &self,
150 tenant: &str,
151 namespace: &str,
152 ) -> Result<Vec<String>, Box<dyn Error>> {
153 let url_path = format!("{}/{}/{}/partitioned", URL_PERSISTENT, tenant, namespace);
154 let response = self.inner_http_client.get(url_path.as_str()).await?;
155 let topics: Vec<String> = serde_json::from_str(&response)?;
156 Ok(topics)
157 }
158
159 pub async fn topic_stats(
160 &self,
161 tenant: &str,
162 namespace: &str,
163 topic: &str,
164 ) -> Result<TopicStats, Box<dyn Error>> {
165 let url_path = format!("{}/{}/{}/{}/stats", URL_PERSISTENT, tenant, namespace, topic);
166 let response =self.inner_http_client.get(url_path.as_str()).await?;
167 let topic_stats: TopicStats = serde_json::from_str(response.as_str())?;
168 Ok(topic_stats)
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use crate::{PulsarAdmin, util};
175
176 const PULSAR_HOST: &str = "127.0.0.1";
177 const PULSAR_PORT: u16 = 8080;
178
179 #[tokio::test]
180 async fn test_non_partitioned_topic() {
181 let pulsar_admin = PulsarAdmin::new(PULSAR_HOST, PULSAR_PORT, None);
182 let namespaces = pulsar_admin.namespaces();
183 let persistent_topics = pulsar_admin.persistent_topics();
184 let tenant = "public";
185 let namespace = util::rand_str(8);
186 println!("test_partitioned_topic namespace: {:?}", namespace);
187 let result = namespaces.create_namespace(tenant, namespace.as_str()).await;
188 assert!(result.is_ok());
189 let topic = "test_partitioned_topic";
190 let result = persistent_topics.create_non_partitioned_topic(tenant, namespace.as_str(), topic).await;
191 assert!(result.is_ok());
192 let result = persistent_topics.list_non_partitioned_topic(tenant, namespace.as_str()).await;
193 assert!(result.is_ok());
194 let topics = result.unwrap();
195 assert_eq!(topics.len(), 1);
196 assert_eq!(topics[0], format!("persistent://{}/{}/{}", tenant, namespace, topic));
197 let result = persistent_topics.delete_non_partitioned_topic(tenant, namespace.as_str(), topic).await;
198 assert!(result.is_ok());
199 let result = namespaces.delete_namespace(tenant, namespace.as_str()).await;
200 assert!(result.is_ok());
201 }
202
203 #[tokio::test]
204 async fn test_non_partitioned_topic_with_topic_stats() {
205 let pulsar_admin = PulsarAdmin::new(PULSAR_HOST, PULSAR_PORT, None);
206 let namespaces = pulsar_admin.namespaces();
207 let persistent_topics = pulsar_admin.persistent_topics();
208 let tenant = "public";
209 let namespace = util::rand_str(8);
210 println!("test_partitioned_topic namespace: {:?}", namespace);
211 let result = namespaces.create_namespace(tenant, namespace.as_str()).await;
212 assert!(result.is_ok());
213 let topic = "test_partitioned_topic";
214 let result = persistent_topics.create_non_partitioned_topic(tenant, namespace.as_str(), topic).await;
215 assert!(result.is_ok());
216 let result = persistent_topics.list_non_partitioned_topic(tenant, namespace.as_str()).await;
217 assert!(result.is_ok());
218 let topics = result.unwrap();
219 assert_eq!(topics.len(), 1);
220 assert_eq!(topics[0], format!("persistent://{}/{}/{}", tenant, namespace, topic));
221 let result = persistent_topics.topic_stats(tenant, namespace.as_str(), topic).await;
222 println!("topic_stats: {:?}", result);
223 assert!(result.is_ok());
224 let result = persistent_topics.delete_non_partitioned_topic(tenant, namespace.as_str(), topic).await;
225 assert!(result.is_ok());
226 let result = namespaces.delete_namespace(tenant, namespace.as_str()).await;
227 assert!(result.is_ok());
228 }
229
230 #[tokio::test]
231 async fn test_partitioned_topic() {
232 let pulsar_admin = PulsarAdmin::new(PULSAR_HOST, PULSAR_PORT, None);
233 let namespaces = pulsar_admin.namespaces();
234 let persistent_topics = pulsar_admin.persistent_topics();
235 let tenant = "public";
236 let namespace = util::rand_str(8);
237 println!("test_partitioned_topic namespace: {:?}", namespace);
238 let result = namespaces.create_namespace(tenant, namespace.as_str()).await;
239 assert!(result.is_ok());
240 let topic = "test_partitioned_topic";
241 let num_partitions = 3;
242 let result = persistent_topics.create_partitioned_topic(tenant, namespace.as_str(), topic, num_partitions).await;
243 assert!(result.is_ok());
244 let result = persistent_topics.list_partitioned_topic(tenant, namespace.as_str()).await;
245 assert!(result.is_ok());
246 let topics = result.unwrap();
247 assert_eq!(topics.len(), 1);
248 assert_eq!(topics[0], format!("persistent://{}/{}/{}", tenant, namespace, topic));
249 let result = persistent_topics.delete_partitioned_topic(tenant, namespace.as_str(), topic).await;
250 assert!(result.is_ok());
251 let result = namespaces.delete_namespace(tenant, namespace.as_str()).await;
252 assert!(result.is_ok());
253 }
254}