1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use std::convert::TryFrom;
use std::fmt;
use crate::derive::Decode;
use crate::derive::Encode;
pub type Offset = i64;
pub type Size = u32;
#[derive(Debug, Encode, Decode, Clone)]
#[fluvio(encode_discriminant)]
#[repr(u8)]
pub enum Isolation {
ReadUncommitted = 0,
ReadCommitted = 1,
}
impl Default for Isolation {
fn default() -> Self {
Isolation::ReadUncommitted
}
}
#[derive(Hash, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Encode, Decode)]
pub struct ReplicaKey {
pub topic: String,
pub partition: i32,
}
unsafe impl Send for ReplicaKey {}
unsafe impl Sync for ReplicaKey {}
impl ReplicaKey {
pub fn new<S, P>(topic: S, partition: P) -> Self
where
S: Into<String>,
P: Into<i32>,
{
ReplicaKey {
topic: topic.into(),
partition: partition.into(),
}
}
pub fn split(self) -> (String, i32) {
(self.topic, self.partition)
}
}
impl std::fmt::Display for ReplicaKey {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}-{}", self.topic, self.partition)
}
}
impl<S> From<(S, i32)> for ReplicaKey
where
S: Into<String>,
{
fn from(key: (S, i32)) -> ReplicaKey {
ReplicaKey::new(key.0.into(), key.1)
}
}
impl TryFrom<String> for ReplicaKey {
type Error = PartitionError;
fn try_from(value: String) -> Result<Self, PartitionError> {
let (topic, partition) = decompose_partition_name(&value)?;
Ok(ReplicaKey::new(topic, partition))
}
}
#[derive(Debug)]
pub enum PartitionError {
InvalidSyntax(String),
}
impl fmt::Display for PartitionError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::InvalidSyntax(msg) => write!(f, "invalid partition syntax: {}", msg),
}
}
}
pub trait PartitionOffset {
fn last_stable_offset(&self) -> i64;
fn start_offset(&self) -> i64;
}
pub fn decompose_partition_name(partition_name: &str) -> Result<(String, i32), PartitionError> {
let dash_pos = partition_name.rfind('-');
if dash_pos.is_none() {
return Err(PartitionError::InvalidSyntax(partition_name.to_owned()));
}
let pos = dash_pos.unwrap();
if (pos + 1) >= partition_name.len() {
return Err(PartitionError::InvalidSyntax(partition_name.to_owned()));
}
let topic_name = &partition_name[..pos];
let idx_string = &partition_name[(pos + 1)..];
let idx = match idx_string.parse::<i32>() {
Ok(n) => n,
Err(_) => {
return Err(PartitionError::InvalidSyntax(partition_name.to_owned()));
}
};
Ok((topic_name.to_string(), idx))
}
pub fn create_partition_name(topic_name: &str, idx: &i32) -> String {
format!("{}-{}", topic_name, idx)
}