fluvio_dataplane_protocol/
common.rs1use std::convert::TryFrom;
2use std::fmt;
3
4use crate::derive::{Encoder, Decoder};
5
6pub type Offset = i64;
7pub type Size = u32;
8pub type Size64 = u64;
9
10#[derive(Debug, Encoder, Decoder, Clone, Copy, Eq, PartialEq)]
11#[fluvio(encode_discriminant)]
12#[repr(u8)]
13pub enum Isolation {
14 ReadUncommitted = 0,
15 ReadCommitted = 1,
16}
17
18impl Default for Isolation {
19 fn default() -> Self {
20 Isolation::ReadUncommitted
21 }
22}
23
24#[derive(Hash, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Encoder, Decoder)]
25pub struct ReplicaKey {
26 pub topic: String,
27 pub partition: i32,
28}
29
30impl fmt::Debug for ReplicaKey {
31 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
32 write!(f, "({},{})", self.topic, self.partition)
33 }
34}
35
36unsafe impl Send for ReplicaKey {}
37
38unsafe impl Sync for ReplicaKey {}
39
40impl ReplicaKey {
41 pub fn new<S, P>(topic: S, partition: P) -> Self
42 where
43 S: Into<String>,
44 P: Into<i32>,
45 {
46 ReplicaKey {
47 topic: topic.into(),
48 partition: partition.into(),
49 }
50 }
51
52 pub fn split(self) -> (String, i32) {
53 (self.topic, self.partition)
54 }
55}
56
57impl std::fmt::Display for ReplicaKey {
58 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
59 write!(f, "{}-{}", self.topic, self.partition)
60 }
61}
62
63impl<S> From<(S, i32)> for ReplicaKey
64where
65 S: Into<String>,
66{
67 fn from(key: (S, i32)) -> ReplicaKey {
68 ReplicaKey::new(key.0.into(), key.1)
69 }
70}
71
72impl TryFrom<String> for ReplicaKey {
73 type Error = PartitionError;
74
75 fn try_from(value: String) -> Result<Self, PartitionError> {
76 let (topic, partition) = decompose_partition_name(&value)?;
77 Ok(ReplicaKey::new(topic, partition))
78 }
79}
80
81#[derive(Debug)]
82pub enum PartitionError {
83 InvalidSyntax(String),
84}
85
86impl fmt::Display for PartitionError {
87 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
88 match self {
89 Self::InvalidSyntax(msg) => write!(f, "invalid partition syntax: {}", msg),
90 }
91 }
92}
93
94pub trait PartitionOffset {
96 fn last_stable_offset(&self) -> i64;
98
99 fn start_offset(&self) -> i64;
101}
102
103pub fn decompose_partition_name(partition_name: &str) -> Result<(String, i32), PartitionError> {
105 let dash_pos = partition_name.rfind('-');
106 if dash_pos.is_none() {
107 return Err(PartitionError::InvalidSyntax(partition_name.to_owned()));
108 }
109
110 let pos = dash_pos.unwrap();
111 if (pos + 1) >= partition_name.len() {
112 return Err(PartitionError::InvalidSyntax(partition_name.to_owned()));
113 }
114
115 let topic_name = &partition_name[..pos];
116 let idx_string = &partition_name[(pos + 1)..];
117 let idx = match idx_string.parse::<i32>() {
118 Ok(n) => n,
119 Err(_) => {
120 return Err(PartitionError::InvalidSyntax(partition_name.to_owned()));
121 }
122 };
123
124 Ok((topic_name.to_string(), idx))
125}
126
127pub fn create_partition_name(topic_name: &str, idx: &i32) -> String {
128 format!("{}-{}", topic_name, idx)
129}