kf_protocol_api/
common.rs1use std::convert::TryFrom;
2use std::fmt;
3
4use serde::{Serialize, Deserialize};
5
6use kf_protocol_derive::Decode;
7use kf_protocol_derive::Encode;
8
9#[derive(Debug, Encode, Serialize, Deserialize, Decode, Clone)]
10#[fluvio_kf(encode_discriminant)]
11#[repr(u8)]
12pub enum Isolation {
13 ReadUncommitted = 0,
14 ReadCommitted = 1,
15}
16
17impl Default for Isolation {
18 fn default() -> Self {
19 Isolation::ReadUncommitted
20 }
21}
22
23
24
25
26#[derive(Hash, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Encode, Decode)]
27pub struct ReplicaKey {
28 pub topic: String,
29 pub partition: i32,
30}
31
32unsafe impl Send for ReplicaKey{}
33
34unsafe impl Sync for ReplicaKey{}
35
36impl ReplicaKey {
37 pub fn new<S, P>(topic: S, partition: P) -> Self
38 where
39 S: Into<String>,
40 P: Into<i32>,
41 {
42 ReplicaKey {
43 topic: topic.into(),
44 partition: partition.into(),
45 }
46 }
47
48 pub fn split(self) -> (String, i32) {
49 (self.topic, self.partition)
50 }
51}
52
53impl std::fmt::Display for ReplicaKey {
54 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
55 write!(f, "{}-{}", self.topic, self.partition)
56 }
57}
58
59impl<S> From<(S, i32)> for ReplicaKey
60where
61 S: Into<String>,
62{
63 fn from(key: (S, i32)) -> ReplicaKey {
64 ReplicaKey::new(key.0.into(), key.1)
65 }
66}
67
68impl TryFrom<String> for ReplicaKey {
69 type Error = PartitionError;
70
71 fn try_from(value: String) -> Result<Self, PartitionError> {
72 let (topic, partition) = decompose_partition_name(&value)?;
73 Ok(ReplicaKey::new(topic, partition))
74 }
75}
76
77
78
79#[derive(Debug)]
80pub enum PartitionError {
81 InvalidSyntax(String),
82}
83
84impl fmt::Display for PartitionError {
85 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
86 match self {
87 Self::InvalidSyntax(msg) => write!(f, "invalid partition syntax: {}", msg),
88 }
89 }
90}
91
92pub fn decompose_partition_name(partition_name: &str) -> Result<(String, i32), PartitionError> {
94 let dash_pos = partition_name.rfind('-');
95 if dash_pos.is_none() {
96 return Err(PartitionError::InvalidSyntax(partition_name.to_owned()));
97 }
98
99 let pos = dash_pos.unwrap();
100 if (pos + 1) >= partition_name.len() {
101 return Err(PartitionError::InvalidSyntax(partition_name.to_owned()));
102 }
103
104 let topic_name = &partition_name[..pos];
105 let idx_string = &partition_name[(pos + 1)..];
106 let idx = match idx_string.parse::<i32>() {
107 Ok(n) => n,
108 Err(_) => {
109 return Err(PartitionError::InvalidSyntax(partition_name.to_owned()));
110 }
111 };
112
113 Ok((topic_name.to_string(), idx))
114}
115
116pub fn create_partition_name(topic_name: &str, idx: &i32) -> String {
117 format!("{}-{}", topic_name.clone(), idx)
118}