kafkang 0.2.1

Rust client for Apache Kafka
Documentation
//! Types related to topic metadata for introspection by clients.
//! Example: `KafkaClient::topics()`.

use std::collections::hash_map;
use std::fmt;

use super::KafkaClient;
use super::state::{ClientState, TopicPartition, TopicPartitionIter, TopicPartitions};

// public re-export
pub use super::state::Broker;
pub use super::state::TopicNames;

/// A view on the loaded metadata about topics and their partitions.
pub struct Topics<'a> {
    state: &'a ClientState,
}

impl<'a> Topics<'a> {
    /// Constructs a view of the currently loaded topic metadata from
    /// the specified kafka client.
    #[inline]
    #[must_use]
    pub fn new(client: &KafkaClient) -> Topics<'_> {
        Topics {
            state: &client.state,
        }
    }

    /// Retrieves the number of the underlying topics.
    #[inline]
    #[must_use]
    pub fn len(&self) -> usize {
        self.state.num_topics()
    }

    #[must_use]
    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }

    /// Provides an iterator over the underlying topics.
    #[inline]
    #[must_use]
    pub fn iter(&'a self) -> TopicIter<'a> {
        TopicIter::new(self.state)
    }

    /// A convenience method to return an iterator over the topics'
    /// names.
    #[inline]
    #[must_use]
    pub fn names(&'a self) -> TopicNames<'a> {
        self.state.topic_names()
    }

    /// A convenience method to determine whether the specified topic
    /// is known.
    #[inline]
    #[must_use]
    pub fn contains(&'a self, topic: &str) -> bool {
        self.state.contains_topic(topic)
    }

    /// Retrieves the partitions of a specified topic.
    #[inline]
    #[must_use]
    pub fn partitions(&'a self, topic: &str) -> Option<Partitions<'a>> {
        self.state.partitions_for(topic).map(|tp| Partitions {
            state: self.state,
            tp,
        })
    }
}

impl fmt::Debug for Topics<'_> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "Topics {{ topics: [")?;
        let mut ts = self.iter();
        if let Some(t) = ts.next() {
            write!(f, "{t:?}")?;
        }
        for t in ts {
            write!(f, ", {t:?}")?;
        }
        write!(f, "] }}")
    }
}

impl<'a> IntoIterator for &'a Topics<'a> {
    type Item = Topic<'a>;
    type IntoIter = TopicIter<'a>;

    fn into_iter(self) -> Self::IntoIter {
        self.iter()
    }
}

impl<'a> IntoIterator for Topics<'a> {
    type Item = Topic<'a>;
    type IntoIter = TopicIter<'a>;

    fn into_iter(self) -> Self::IntoIter {
        TopicIter::new(self.state)
    }
}

/// An iterator over topics.
pub struct TopicIter<'a> {
    state: &'a ClientState,
    iter: hash_map::Iter<'a, String, TopicPartitions>,
}

impl<'a> TopicIter<'a> {
    fn new(state: &'a ClientState) -> TopicIter<'a> {
        TopicIter {
            state,
            iter: state.topic_partitions().iter(),
        }
    }
}

impl<'a> Iterator for TopicIter<'a> {
    type Item = Topic<'a>;

    #[inline]
    fn next(&mut self) -> Option<Self::Item> {
        self.iter.next().map(|(name, tps)| Topic {
            state: self.state,
            name: &name[..],
            tp: tps,
        })
    }
}

/// A view on the loaded metadata for a particular topic.
pub struct Topic<'a> {
    state: &'a ClientState,
    name: &'a str,
    tp: &'a TopicPartitions,
}

impl<'a> Topic<'a> {
    /// Retrieves the name of this topic.
    #[inline]
    #[must_use]
    pub fn name(&self) -> &str {
        self.name
    }

    /// Retrieves the list of all partitions for this topic.
    #[inline]
    #[must_use]
    pub fn partitions(&self) -> Partitions<'a> {
        Partitions {
            state: self.state,
            tp: self.tp,
        }
    }
}

impl fmt::Debug for Topic<'_> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "Topic {{ name: {}, partitions: {:?} }}",
            self.name,
            self.partitions()
        )
    }
}

/// Metadata relevant to partitions of a particular topic.
pub struct Partitions<'a> {
    state: &'a ClientState,
    tp: &'a TopicPartitions,
}

impl<'a> Partitions<'a> {
    /// Retrieves the number of the topic's partitions.
    #[inline]
    #[must_use]
    pub fn len(&self) -> usize {
        self.tp.len()
    }

    /// Tests for `.len() > 0`.
    #[inline]
    #[must_use]
    pub fn is_empty(&self) -> bool {
        self.tp.is_empty()
    }

    /// Retrieves an iterator of the partitions of the underlying topic.
    #[inline]
    #[must_use]
    pub fn iter(&self) -> PartitionIter<'a> {
        PartitionIter::new(self.state, self.tp)
    }

    /// Finds a specified partition identified by its id.
    #[inline]
    #[must_use]
    pub fn partition(&self, partition_id: i32) -> Option<Partition<'a>> {
        self.tp
            .partition(partition_id)
            .map(|p| Partition::new(self.state, p, partition_id))
    }

    /// Convenience method to retrieve the identifiers of all
    /// currently "available" partitions.  Such partitions are known
    /// to have a leader broker and can be sent messages to.
    #[inline]
    #[must_use]
    pub fn available_ids(&self) -> Vec<i32> {
        self.tp
            .iter()
            .filter_map(|(id, p)| p.broker(self.state).map(|_| id))
            .collect()
    }
}

impl fmt::Debug for Partitions<'_> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "Partitions {{ [")?;
        let mut ps = self.iter();
        if let Some(p) = ps.next() {
            write!(f, "{p:?}")?;
        }
        for p in ps {
            write!(f, ", {p:?}")?;
        }
        write!(f, "] }}")
    }
}

impl<'a> IntoIterator for &'a Partitions<'a> {
    type Item = Partition<'a>;
    type IntoIter = PartitionIter<'a>;

    fn into_iter(self) -> Self::IntoIter {
        self.iter()
    }
}

impl<'a> IntoIterator for Partitions<'a> {
    type Item = Partition<'a>;
    type IntoIter = PartitionIter<'a>;

    fn into_iter(self) -> Self::IntoIter {
        PartitionIter::new(self.state, self.tp)
    }
}

/// An iterator over a topic's partitions.
pub struct PartitionIter<'a> {
    state: &'a ClientState,
    iter: TopicPartitionIter<'a>,
}

impl<'a> PartitionIter<'a> {
    fn new(state: &'a ClientState, tp: &'a TopicPartitions) -> Self {
        PartitionIter {
            state,
            iter: tp.iter(),
        }
    }
}

impl<'a> Iterator for PartitionIter<'a> {
    type Item = Partition<'a>;

    #[inline]
    fn next(&mut self) -> Option<Self::Item> {
        self.iter
            .next()
            .map(|(id, p)| Partition::new(self.state, p, id))
    }
}

/// Metadata about a particular topic partition.
///
/// A partition can be seen as either available or not by
/// `kafkang`.  "Available" partitions are partitions with an
/// assigned leader broker and can be send messages to or fetched
/// messages from.  Non-available partitions are ignored by
/// `kafkang`.  Whether or not a partition is currently "available"
/// can be determined by testing for `partition.leader().is_some()` or
/// more directly through `partition.is_available()`.
pub struct Partition<'a> {
    state: &'a ClientState,
    tp: &'a TopicPartition,
    id: i32,
}

impl<'a> Partition<'a> {
    fn new(state: &'a ClientState, partition: &'a TopicPartition, id: i32) -> Partition<'a> {
        Self {
            state,
            tp: partition,
            id,
        }
    }

    /// Retrieves the identifier of this topic partition.
    #[inline]
    #[must_use]
    pub fn id(&self) -> i32 {
        self.id
    }

    /// Retrieves the current leader broker of this partition - if
    /// any.  A partition with a leader is said to be "available".
    #[inline]
    #[must_use]
    pub fn leader(&self) -> Option<&'a Broker> {
        self.tp.broker(self.state)
    }

    /// Determines whether this partition is currently "available".
    /// See `Partition::leader()`.
    #[must_use]
    pub fn is_available(&self) -> bool {
        self.leader().is_some()
    }
}

impl fmt::Debug for Partition<'_> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "Partition {{ id: {}, leader: {:?} }}",
            self.id(),
            self.leader()
        )
    }
}