kf_protocol_api/
common.rs

1use std::convert::TryFrom;
2use std::fmt;
3
4use serde::{Serialize, Deserialize};
5
6use kf_protocol_derive::Decode;
7use kf_protocol_derive::Encode;
8
9#[derive(Debug, Encode, Serialize, Deserialize, Decode, Clone)]
10#[fluvio_kf(encode_discriminant)]
11#[repr(u8)]
12pub enum Isolation {
13    ReadUncommitted = 0,
14    ReadCommitted = 1,
15}
16
17impl Default for Isolation {
18    fn default() -> Self {
19        Isolation::ReadUncommitted
20    }
21}
22
23
24
25
26#[derive(Hash, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Encode, Decode)]
27pub struct ReplicaKey {
28    pub topic: String,
29    pub partition: i32,
30}
31
32unsafe impl Send for ReplicaKey{}
33
34unsafe impl Sync for ReplicaKey{}
35
36impl ReplicaKey {
37    pub fn new<S, P>(topic: S, partition: P) -> Self
38    where
39        S: Into<String>,
40        P: Into<i32>,
41    {
42        ReplicaKey {
43            topic: topic.into(),
44            partition: partition.into(),
45        }
46    }
47
48    pub fn split(self) -> (String, i32) {
49        (self.topic, self.partition)
50    }
51}
52
53impl std::fmt::Display for ReplicaKey {
54    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
55        write!(f, "{}-{}", self.topic, self.partition)
56    }
57}
58
59impl<S> From<(S, i32)> for ReplicaKey
60where
61    S: Into<String>,
62{
63    fn from(key: (S, i32)) -> ReplicaKey {
64        ReplicaKey::new(key.0.into(), key.1)
65    }
66}
67
68impl TryFrom<String> for ReplicaKey {
69    type Error = PartitionError;
70
71    fn try_from(value: String) -> Result<Self, PartitionError> {
72        let (topic, partition) = decompose_partition_name(&value)?;
73        Ok(ReplicaKey::new(topic, partition))
74    }
75}
76
77
78
79#[derive(Debug)]
80pub enum PartitionError {
81    InvalidSyntax(String),
82}
83
84impl fmt::Display for PartitionError {
85    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
86        match self {
87            Self::InvalidSyntax(msg) => write!(f, "invalid partition syntax: {}", msg),
88        }
89    }
90}
91
92// returns a tuple (topic_name, idx)
93pub fn decompose_partition_name(partition_name: &str) -> Result<(String, i32), PartitionError> {
94    let dash_pos = partition_name.rfind('-');
95    if dash_pos.is_none() {
96        return Err(PartitionError::InvalidSyntax(partition_name.to_owned()));
97    }
98
99    let pos = dash_pos.unwrap();
100    if (pos + 1) >= partition_name.len() {
101        return Err(PartitionError::InvalidSyntax(partition_name.to_owned()));
102    }
103
104    let topic_name = &partition_name[..pos];
105    let idx_string = &partition_name[(pos + 1)..];
106    let idx = match idx_string.parse::<i32>() {
107        Ok(n) => n,
108        Err(_) => {
109            return Err(PartitionError::InvalidSyntax(partition_name.to_owned()));
110        }
111    };
112
113    Ok((topic_name.to_string(), idx))
114}
115
116pub fn create_partition_name(topic_name: &str, idx: &i32) -> String {
117    format!("{}-{}", topic_name.clone(), idx)
118}