pulsar-admin 0.0.2

A Rust-based HTTP client for interacting with the Apache Pulsar REST API
Documentation
use std::collections::HashMap;
use std::error::Error;

use serde::{Deserialize, Serialize};

use crate::inner_http_client::InnerHttpClient;
use crate::url_constants::URL_PERSISTENT;

pub struct PersistentTopics<'a> {
    inner_http_client: &'a InnerHttpClient,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct TopicStats {
    #[serde(rename = "msgRateIn")]
    msg_rate_in: f64,
    #[serde(rename = "msgThroughputIn")]
    msg_throughput_in: f64,
    #[serde(rename = "msgRateOut")]
    msg_rate_out: f64,
    #[serde(rename = "msgThroughputOut")]
    msg_throughput_out: f64,
    #[serde(rename = "bytesInCounter")]
    bytes_in_counter: u64,
    #[serde(rename = "msgInCounter")]
    msg_in_counter: u64,
    #[serde(rename = "bytesOutCounter")]
    bytes_out_counter: u64,
    #[serde(rename = "msgOutCounter")]
    msg_out_counter: u64,
    #[serde(rename = "averageMsgSize")]
    average_msg_size: f64,
    #[serde(rename = "msgChunkPublished")]
    msg_chunk_published: bool,
    #[serde(rename = "storageSize")]
    storage_size: u64,
    #[serde(rename = "backlogSize")]
    backlog_size: u64,
    #[serde(rename = "publishRateLimitedTimes")]
    publish_rate_limited_times: u64,
    #[serde(rename = "earliestMsgPublishTimeInBacklogs")]
    earliest_msg_publish_time_in_backlogs: u64,
    #[serde(rename = "offloadedStorageSize")]
    offloaded_storage_size: u64,
    #[serde(rename = "lastOffloadLedgerId")]
    last_offload_ledger_id: u64,
    #[serde(rename = "lastOffloadSuccessTimeStamp")]
    last_offload_success_time_stamp: u64,
    #[serde(rename = "lastOffloadFailureTimeStamp")]
    last_offload_failure_time_stamp: u64,
    #[serde(rename = "ongoingTxnCount")]
    ongoing_txn_count: Option<u64>,
    #[serde(rename = "abortedTxnCount")]
    aborted_txn_count: Option<u64>,
    #[serde(rename = "committedTxnCount")]
    committed_txn_count: Option<u64>,
    #[serde(rename = "publishers")]
    publishers: Vec<String>,
    #[serde(rename = "waitingPublishers")]
    waiting_publishers: u64,
    #[serde(rename = "subscriptions")]
    subscriptions: HashMap<String, String>,
    #[serde(rename = "replication")]
    replication: HashMap<String, String>,
    #[serde(rename = "deduplicationStatus")]
    deduplication_status: String,
    #[serde(rename = "nonContiguousDeletedMessagesRanges")]
    non_contiguous_deleted_messages_ranges: u64,
    #[serde(rename = "nonContiguousDeletedMessagesRangesSerializedSize")]
    non_contiguous_deleted_messages_ranges_serialized_size: u64,
    #[serde(rename = "delayedMessageIndexSizeInBytes")]
    delayed_message_index_size_in_bytes: Option<u64>,
    #[serde(rename = "compaction")]
    compaction: Compaction,
    #[serde(rename = "ownerBroker")]
    owner_broker: Option<String>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct Compaction {
    #[serde(rename = "lastCompactionRemovedEventCount")]
    last_compaction_removed_event_count: u64,
    #[serde(rename = "lastCompactionSucceedTimestamp")]
    last_compaction_succeed_timestamp: u64,
    #[serde(rename = "lastCompactionFailedTimestamp")]
    last_compaction_failed_timestamp: u64,
    #[serde(rename = "lastCompactionDurationTimeInMills")]
    last_compaction_duration_time_in_mills: u64,
}

impl<'a> PersistentTopics<'a> {
    pub fn new(inner_http_client: &'a InnerHttpClient) -> Self {
        PersistentTopics { inner_http_client }
    }

    pub async fn create_non_partitioned_topic(
        &self,
        tenant: &str,
        namespace: &str,
        topic: &str,
    ) -> Result<(), Box<dyn Error>> {
        let url_path = format!("{}/{}/{}/{}", URL_PERSISTENT, tenant, namespace, topic);
        self.inner_http_client.put(url_path.as_str(), "".to_string()).await
    }

    pub async fn delete_non_partitioned_topic(
        &self,
        tenant: &str,
        namespace: &str,
        topic: &str,
    ) -> Result<(), Box<dyn Error>> {
        let url_path = format!("{}/{}/{}/{}", URL_PERSISTENT, tenant, namespace, topic);
        self.inner_http_client.delete(url_path.as_str()).await
    }

    pub async fn list_non_partitioned_topic(
        &self,
        tenant: &str,
        namespace: &str,
    ) -> Result<Vec<String>, Box<dyn Error>> {
        let url_path = format!("{}/{}/{}", URL_PERSISTENT, tenant, namespace);
        let response = self.inner_http_client.get(url_path.as_str()).await?;
        let topics: Vec<String> = serde_json::from_str(&response)?;
        Ok(topics)
    }

    pub async fn create_partitioned_topic(
        &self,
        tenant: &str,
        namespace: &str,
        topic: &str,
        num_partitions: i32,
    ) -> Result<(), Box<dyn Error>> {
        let url_path = format!("{}/{}/{}/{}/partitions", URL_PERSISTENT, tenant, namespace, topic);
        self.inner_http_client.put(url_path.as_str(), num_partitions.to_string()).await
    }

    pub async fn delete_partitioned_topic(
        &self,
        tenant: &str,
        namespace: &str,
        topic: &str,
    ) -> Result<(), Box<dyn Error>> {
        let url_path = format!("{}/{}/{}/{}/partitions", URL_PERSISTENT, tenant, namespace, topic);
        self.inner_http_client.delete(url_path.as_str()).await
    }

    pub async fn list_partitioned_topic(
        &self,
        tenant: &str,
        namespace: &str,
    ) -> Result<Vec<String>, Box<dyn Error>> {
        let url_path = format!("{}/{}/{}/partitioned", URL_PERSISTENT, tenant, namespace);
        let response = self.inner_http_client.get(url_path.as_str()).await?;
        let topics: Vec<String> = serde_json::from_str(&response)?;
        Ok(topics)
    }

    pub async fn topic_stats(
        &self,
        tenant: &str,
        namespace: &str,
        topic: &str,
    ) -> Result<TopicStats, Box<dyn Error>> {
        let url_path = format!("{}/{}/{}/{}/stats", URL_PERSISTENT, tenant, namespace, topic);
        let response =self.inner_http_client.get(url_path.as_str()).await?;
        let topic_stats: TopicStats = serde_json::from_str(response.as_str())?;
        Ok(topic_stats)
    }
}

#[cfg(test)]
mod tests {
    use crate::{PulsarAdmin, util};

    const PULSAR_HOST: &str = "127.0.0.1";
    const PULSAR_PORT: u16 = 8080;

    #[tokio::test]
    async fn test_non_partitioned_topic() {
        let pulsar_admin = PulsarAdmin::new(PULSAR_HOST, PULSAR_PORT, None);
        let namespaces = pulsar_admin.namespaces();
        let persistent_topics = pulsar_admin.persistent_topics();
        let tenant = "public";
        let namespace = util::rand_str(8);
        println!("test_partitioned_topic namespace: {:?}", namespace);
        let result = namespaces.create_namespace(tenant, namespace.as_str()).await;
        assert!(result.is_ok());
        let topic = "test_partitioned_topic";
        let result = persistent_topics.create_non_partitioned_topic(tenant, namespace.as_str(), topic).await;
        assert!(result.is_ok());
        let result = persistent_topics.list_non_partitioned_topic(tenant, namespace.as_str()).await;
        assert!(result.is_ok());
        let topics = result.unwrap();
        assert_eq!(topics.len(), 1);
        assert_eq!(topics[0], format!("persistent://{}/{}/{}", tenant, namespace, topic));
        let result = persistent_topics.delete_non_partitioned_topic(tenant, namespace.as_str(), topic).await;
        assert!(result.is_ok());
        let result = namespaces.delete_namespace(tenant, namespace.as_str()).await;
        assert!(result.is_ok());
    }

    #[tokio::test]
    async fn test_non_partitioned_topic_with_topic_stats() {
        let pulsar_admin = PulsarAdmin::new(PULSAR_HOST, PULSAR_PORT, None);
        let namespaces = pulsar_admin.namespaces();
        let persistent_topics = pulsar_admin.persistent_topics();
        let tenant = "public";
        let namespace = util::rand_str(8);
        println!("test_partitioned_topic namespace: {:?}", namespace);
        let result = namespaces.create_namespace(tenant, namespace.as_str()).await;
        assert!(result.is_ok());
        let topic = "test_partitioned_topic";
        let result = persistent_topics.create_non_partitioned_topic(tenant, namespace.as_str(), topic).await;
        assert!(result.is_ok());
        let result = persistent_topics.list_non_partitioned_topic(tenant, namespace.as_str()).await;
        assert!(result.is_ok());
        let topics = result.unwrap();
        assert_eq!(topics.len(), 1);
        assert_eq!(topics[0], format!("persistent://{}/{}/{}", tenant, namespace, topic));
        let result = persistent_topics.topic_stats(tenant, namespace.as_str(), topic).await;
        println!("topic_stats: {:?}", result);
        assert!(result.is_ok());
        let result = persistent_topics.delete_non_partitioned_topic(tenant, namespace.as_str(), topic).await;
        assert!(result.is_ok());
        let result = namespaces.delete_namespace(tenant, namespace.as_str()).await;
        assert!(result.is_ok());
    }

    #[tokio::test]
    async fn test_partitioned_topic() {
        let pulsar_admin = PulsarAdmin::new(PULSAR_HOST, PULSAR_PORT, None);
        let namespaces = pulsar_admin.namespaces();
        let persistent_topics = pulsar_admin.persistent_topics();
        let tenant = "public";
        let namespace = util::rand_str(8);
        println!("test_partitioned_topic namespace: {:?}", namespace);
        let result = namespaces.create_namespace(tenant, namespace.as_str()).await;
        assert!(result.is_ok());
        let topic = "test_partitioned_topic";
        let num_partitions = 3;
        let result = persistent_topics.create_partitioned_topic(tenant, namespace.as_str(), topic, num_partitions).await;
        assert!(result.is_ok());
        let result = persistent_topics.list_partitioned_topic(tenant, namespace.as_str()).await;
        assert!(result.is_ok());
        let topics = result.unwrap();
        assert_eq!(topics.len(), 1);
        assert_eq!(topics[0], format!("persistent://{}/{}/{}", tenant, namespace, topic));
        let result = persistent_topics.delete_partitioned_topic(tenant, namespace.as_str(), topic).await;
        assert!(result.is_ok());
        let result = namespaces.delete_namespace(tenant, namespace.as_str()).await;
        assert!(result.is_ok());
    }
}