fluvio_controlplane_metadata/spu/
spec.rs

1#![allow(clippy::assign_op_pattern)]
2
3//!
4//! # Spu Spec
5//!
6//! Spu Spec metadata information cached locally.
7//!
8use std::convert::TryFrom;
9use std::io::Error as IoError;
10use std::io::ErrorKind;
11use std::fmt;
12
13use flv_util::socket_helpers::EndPoint as SocketEndPoint;
14use flv_util::socket_helpers::EndPointEncryption;
15use fluvio_types::SpuId;
16use flv_util::socket_helpers::ServerAddress;
17
18use fluvio_protocol::{Encoder, Decoder};
19use fluvio_protocol::bytes::{Buf, BufMut};
20use fluvio_protocol::Version;
21
22#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq, Default)]
23#[cfg_attr(
24    feature = "use_serde",
25    derive(serde::Serialize, serde::Deserialize),
26    serde(rename_all = "camelCase")
27)]
28pub struct SpuSpec {
29    #[cfg_attr(feature = "use_serde", serde(rename = "spuId"))]
30    pub id: SpuId,
31    #[cfg_attr(feature = "use_serde", serde(default))]
32    pub spu_type: SpuType,
33    pub public_endpoint: IngressPort,
34    pub private_endpoint: Endpoint,
35    #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))]
36    pub rack: Option<String>,
37
38    #[fluvio(min_version = 1)]
39    #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))]
40    pub public_endpoint_local: Option<Endpoint>,
41}
42
43impl fmt::Display for SpuSpec {
44    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
45        write!(
46            f,
47            "id: {}, type: {}, public: {}",
48            self.id, self.spu_type, self.public_endpoint
49        )
50    }
51}
52
53impl From<SpuId> for SpuSpec {
54    fn from(spec: SpuId) -> Self {
55        Self::new(spec)
56    }
57}
58
59impl SpuSpec {
60    /// Given an Spu id generate a new SpuSpec
61    pub fn new(id: SpuId) -> Self {
62        Self {
63            id,
64            ..Default::default()
65        }
66    }
67
68    pub fn new_public_addr(id: SpuId, port: u16, host: String) -> Self {
69        Self {
70            id,
71            public_endpoint: IngressPort {
72                port,
73                ingress: vec![IngressAddr::from_host(host)],
74                ..Default::default()
75            },
76            ..Default::default()
77        }
78    }
79
80    pub fn new_private_addr(id: SpuId, port: u16, host: String) -> Self {
81        Self {
82            id,
83            private_endpoint: Endpoint {
84                port,
85                host,
86                ..Default::default()
87            },
88            ..Default::default()
89        }
90    }
91
92    pub fn set_custom(mut self) -> Self {
93        self.spu_type = SpuType::Custom;
94        self
95    }
96
97    /// Return custom type: true for custom, false otherwise
98    pub fn is_custom(&self) -> bool {
99        match self.spu_type {
100            SpuType::Managed => false,
101            SpuType::Custom => true,
102        }
103    }
104
105    pub fn private_server_address(&self) -> ServerAddress {
106        let private_ep = &self.private_endpoint;
107        ServerAddress {
108            host: private_ep.host.clone(),
109            port: private_ep.port,
110        }
111    }
112
113    pub fn update(&mut self, other: &Self) {
114        if self.rack != other.rack {
115            self.rack.clone_from(&other.rack);
116        }
117        if self.public_endpoint != other.public_endpoint {
118            self.public_endpoint = other.public_endpoint.clone();
119        }
120        if self.private_endpoint != other.private_endpoint {
121            self.private_endpoint = other.private_endpoint.clone();
122        }
123    }
124}
125
126/// Custom Spu Spec
127/// This is not real spec since when this is stored on metadata store, it will be stored as SPU
128#[derive(Decoder, Encoder, Debug, Clone, Default, Eq, PartialEq)]
129#[cfg_attr(
130    feature = "use_serde",
131    derive(serde::Serialize, serde::Deserialize),
132    serde(rename_all = "camelCase")
133)]
134pub struct CustomSpuSpec {
135    pub id: SpuId,
136    pub public_endpoint: IngressPort,
137    pub private_endpoint: Endpoint,
138    #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))]
139    pub rack: Option<String>,
140    #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))]
141    pub public_endpoint_local: Option<Endpoint>,
142}
143
144impl CustomSpuSpec {
145    pub const LABEL: &'static str = "CustomSpu";
146}
147
148impl From<CustomSpuSpec> for SpuSpec {
149    fn from(spec: CustomSpuSpec) -> Self {
150        Self {
151            id: spec.id,
152            public_endpoint: spec.public_endpoint,
153            private_endpoint: spec.private_endpoint,
154            rack: spec.rack,
155            spu_type: SpuType::Custom,
156            public_endpoint_local: spec.public_endpoint_local,
157        }
158    }
159}
160
161impl From<SpuSpec> for CustomSpuSpec {
162    fn from(spu: SpuSpec) -> Self {
163        match spu.spu_type {
164            SpuType::Custom => Self {
165                id: spu.id,
166                public_endpoint: spu.public_endpoint,
167                public_endpoint_local: spu.public_endpoint_local,
168                private_endpoint: spu.private_endpoint,
169                rack: spu.rack,
170            },
171            SpuType::Managed => panic!("managed spu type can't be converted into custom"),
172        }
173    }
174}
175
176#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
177#[cfg_attr(
178    feature = "use_serde",
179    derive(serde::Serialize, serde::Deserialize),
180    serde(rename_all = "camelCase", default)
181)]
182pub struct IngressPort {
183    pub port: u16,
184    pub ingress: Vec<IngressAddr>,
185    pub encryption: EncryptionEnum,
186}
187
188impl fmt::Display for IngressPort {
189    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
190        write!(f, "{}:{}", self.host_string(), self.port)
191    }
192}
193
194impl From<ServerAddress> for IngressPort {
195    fn from(addr: ServerAddress) -> Self {
196        Self {
197            port: addr.port,
198            ingress: vec![IngressAddr::from_host(addr.host)],
199            ..Default::default()
200        }
201    }
202}
203
204impl IngressPort {
205    pub fn from_port_host(port: u16, host: String) -> Self {
206        Self {
207            port,
208            ingress: vec![IngressAddr {
209                hostname: Some(host),
210                ip: None,
211            }],
212            encryption: EncryptionEnum::PLAINTEXT,
213        }
214    }
215
216    // return any host whether it is IP or String
217    pub fn host(&self) -> Option<String> {
218        if self.ingress.is_empty() {
219            None
220        } else {
221            self.ingress[0].host()
222        }
223    }
224
225    pub fn host_string(&self) -> String {
226        match self.host() {
227            Some(host_val) => host_val,
228            None => "".to_owned(),
229        }
230    }
231
232    // convert to host:addr format
233    pub fn addr(&self) -> String {
234        format!("{}:{}", self.host_string(), self.port)
235    }
236}
237
238#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
239#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
240pub struct IngressAddr {
241    pub hostname: Option<String>,
242    pub ip: Option<String>,
243}
244
245impl IngressAddr {
246    pub fn from_host(hostname: String) -> Self {
247        Self {
248            hostname: Some(hostname),
249            ..Default::default()
250        }
251    }
252
253    pub fn from_ip(ip: String) -> Self {
254        Self {
255            ip: Some(ip),
256            ..Default::default()
257        }
258    }
259
260    pub fn host(&self) -> Option<String> {
261        self.hostname.clone().or_else(|| self.ip.clone())
262    }
263}
264
265#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
266#[cfg_attr(
267    feature = "use_serde",
268    derive(serde::Serialize, serde::Deserialize),
269    serde(rename_all = "camelCase")
270)]
271pub struct Endpoint {
272    pub port: u16,
273    pub host: String,
274    pub encryption: EncryptionEnum,
275}
276
277impl From<ServerAddress> for Endpoint {
278    fn from(addr: ServerAddress) -> Self {
279        Self {
280            port: addr.port,
281            host: addr.host,
282            ..Default::default()
283        }
284    }
285}
286
287impl fmt::Display for Endpoint {
288    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
289        write!(f, "{}:{}", self.host, self.port)
290    }
291}
292
293impl TryFrom<&Endpoint> for SocketEndPoint {
294    type Error = IoError;
295
296    fn try_from(endpoint: &Endpoint) -> Result<Self, Self::Error> {
297        flv_util::socket_helpers::host_port_to_socket_addr(&endpoint.host, endpoint.port).map(
298            |addr| SocketEndPoint {
299                addr,
300                encryption: EndPointEncryption::PLAINTEXT,
301            },
302        )
303    }
304}
305
306#[allow(dead_code)]
307impl TryFrom<&Endpoint> for std::net::SocketAddr {
308    type Error = IoError;
309
310    fn try_from(endpoint: &Endpoint) -> Result<Self, Self::Error> {
311        flv_util::socket_helpers::host_port_to_socket_addr(&endpoint.host, endpoint.port)
312    }
313}
314
315impl Default for Endpoint {
316    fn default() -> Self {
317        Self {
318            host: "127.0.0.1".to_owned(),
319            port: 0,
320            encryption: EncryptionEnum::default(),
321        }
322    }
323}
324
325impl Endpoint {
326    pub fn from_port_host(port: u16, host: String) -> Self {
327        Self {
328            port,
329            host,
330            encryption: EncryptionEnum::PLAINTEXT,
331        }
332    }
333}
334
335#[derive(Default, Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
336#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
337pub enum EncryptionEnum {
338    #[default]
339    #[fluvio(tag = 0)]
340    PLAINTEXT,
341    #[fluvio(tag = 1)]
342    SSL,
343}
344
345#[derive(Debug, Default, Clone, Eq, PartialEq, Encoder, Decoder)]
346#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
347#[derive()]
348pub enum SpuType {
349    #[default]
350    #[fluvio(tag = 0)]
351    Managed,
352    #[fluvio(tag = 1)]
353    Custom,
354}
355
356/// Return type label in String format
357impl SpuType {
358    pub fn type_label(&self) -> &str {
359        match self {
360            Self::Managed => "managed",
361            Self::Custom => "custom",
362        }
363    }
364}
365
366impl fmt::Display for SpuType {
367    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
368        write!(f, "{:#?}", self.type_label())
369    }
370}
371
372#[derive(Debug)]
373pub enum CustomSpu {
374    Name(String),
375    Id(i32),
376}
377
378// -----------------------------------
379// Implementation - CustomSpu
380// -----------------------------------
381impl Default for CustomSpu {
382    fn default() -> CustomSpu {
383        Self::Name("".to_string())
384    }
385}
386
387impl Encoder for CustomSpu {
388    // compute size
389    fn write_size(&self, version: Version) -> usize {
390        let type_size = (0u8).write_size(version);
391        match self {
392            Self::Name(name) => type_size + name.write_size(version),
393            Self::Id(id) => type_size + id.write_size(version),
394        }
395    }
396
397    // encode match
398    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), IoError>
399    where
400        T: BufMut,
401    {
402        // ensure buffer is large enough
403        if dest.remaining_mut() < self.write_size(version) {
404            return Err(IoError::new(
405                ErrorKind::UnexpectedEof,
406                format!(
407                    "not enough capacity for custom spu len of {}",
408                    self.write_size(version)
409                ),
410            ));
411        }
412
413        match self {
414            Self::Name(name) => {
415                let typ: u8 = 0;
416                typ.encode(dest, version)?;
417                name.encode(dest, version)?;
418            }
419            Self::Id(id) => {
420                let typ: u8 = 1;
421                typ.encode(dest, version)?;
422                id.encode(dest, version)?;
423            }
424        }
425
426        Ok(())
427    }
428}
429
430impl Decoder for CustomSpu {
431    fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), IoError>
432    where
433        T: Buf,
434    {
435        let mut value: u8 = 0;
436        value.decode(src, version)?;
437        match value {
438            0 => {
439                let mut name: String = String::default();
440                name.decode(src, version)?;
441                *self = Self::Name(name)
442            }
443            1 => {
444                let mut id: i32 = 0;
445                id.decode(src, version)?;
446                *self = Self::Id(id)
447            }
448            _ => {
449                return Err(IoError::new(
450                    ErrorKind::UnexpectedEof,
451                    format!("invalid value for Custom Spu: {value}"),
452                ));
453            }
454        }
455
456        Ok(())
457    }
458}