fluvio_dataplane_protocol/
common.rs

1use std::convert::TryFrom;
2use std::fmt;
3
4use crate::derive::{Encoder, Decoder};
5
6pub type Offset = i64;
7pub type Size = u32;
8pub type Size64 = u64;
9
10#[derive(Debug, Encoder, Decoder, Clone, Copy, Eq, PartialEq)]
11#[fluvio(encode_discriminant)]
12#[repr(u8)]
13pub enum Isolation {
14    ReadUncommitted = 0,
15    ReadCommitted = 1,
16}
17
18impl Default for Isolation {
19    fn default() -> Self {
20        Isolation::ReadUncommitted
21    }
22}
23
24#[derive(Hash, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Encoder, Decoder)]
25pub struct ReplicaKey {
26    pub topic: String,
27    pub partition: i32,
28}
29
30impl fmt::Debug for ReplicaKey {
31    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
32        write!(f, "({},{})", self.topic, self.partition)
33    }
34}
35
36unsafe impl Send for ReplicaKey {}
37
38unsafe impl Sync for ReplicaKey {}
39
40impl ReplicaKey {
41    pub fn new<S, P>(topic: S, partition: P) -> Self
42    where
43        S: Into<String>,
44        P: Into<i32>,
45    {
46        ReplicaKey {
47            topic: topic.into(),
48            partition: partition.into(),
49        }
50    }
51
52    pub fn split(self) -> (String, i32) {
53        (self.topic, self.partition)
54    }
55}
56
57impl std::fmt::Display for ReplicaKey {
58    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
59        write!(f, "{}-{}", self.topic, self.partition)
60    }
61}
62
63impl<S> From<(S, i32)> for ReplicaKey
64where
65    S: Into<String>,
66{
67    fn from(key: (S, i32)) -> ReplicaKey {
68        ReplicaKey::new(key.0.into(), key.1)
69    }
70}
71
72impl TryFrom<String> for ReplicaKey {
73    type Error = PartitionError;
74
75    fn try_from(value: String) -> Result<Self, PartitionError> {
76        let (topic, partition) = decompose_partition_name(&value)?;
77        Ok(ReplicaKey::new(topic, partition))
78    }
79}
80
81#[derive(Debug)]
82pub enum PartitionError {
83    InvalidSyntax(String),
84}
85
86impl fmt::Display for PartitionError {
87    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
88        match self {
89            Self::InvalidSyntax(msg) => write!(f, "invalid partition syntax: {}", msg),
90        }
91    }
92}
93
94/// Offset information about Partition
95pub trait PartitionOffset {
96    /// last offset that was committed
97    fn last_stable_offset(&self) -> i64;
98
99    // beginning offset for the partition
100    fn start_offset(&self) -> i64;
101}
102
103// returns a tuple (topic_name, idx)
104pub fn decompose_partition_name(partition_name: &str) -> Result<(String, i32), PartitionError> {
105    let dash_pos = partition_name.rfind('-');
106    if dash_pos.is_none() {
107        return Err(PartitionError::InvalidSyntax(partition_name.to_owned()));
108    }
109
110    let pos = dash_pos.unwrap();
111    if (pos + 1) >= partition_name.len() {
112        return Err(PartitionError::InvalidSyntax(partition_name.to_owned()));
113    }
114
115    let topic_name = &partition_name[..pos];
116    let idx_string = &partition_name[(pos + 1)..];
117    let idx = match idx_string.parse::<i32>() {
118        Ok(n) => n,
119        Err(_) => {
120            return Err(PartitionError::InvalidSyntax(partition_name.to_owned()));
121        }
122    };
123
124    Ok((topic_name.to_string(), idx))
125}
126
127pub fn create_partition_name(topic_name: &str, idx: &i32) -> String {
128    format!("{}-{}", topic_name, idx)
129}