1#![allow(clippy::assign_op_pattern)]
2
3use std::convert::TryFrom;
9use std::io::Error as IoError;
10use std::io::ErrorKind;
11use std::fmt;
12
13use flv_util::socket_helpers::EndPoint as SocketEndPoint;
14use flv_util::socket_helpers::EndPointEncryption;
15use fluvio_types::SpuId;
16use flv_util::socket_helpers::ServerAddress;
17
18use fluvio_protocol::{Encoder, Decoder};
19use fluvio_protocol::bytes::{Buf, BufMut};
20use fluvio_protocol::Version;
21
22#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq, Default)]
23#[cfg_attr(
24 feature = "use_serde",
25 derive(serde::Serialize, serde::Deserialize),
26 serde(rename_all = "camelCase")
27)]
28pub struct SpuSpec {
29 #[cfg_attr(feature = "use_serde", serde(rename = "spuId"))]
30 pub id: SpuId,
31 #[cfg_attr(feature = "use_serde", serde(default))]
32 pub spu_type: SpuType,
33 pub public_endpoint: IngressPort,
34 pub private_endpoint: Endpoint,
35 #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))]
36 pub rack: Option<String>,
37
38 #[fluvio(min_version = 1)]
39 #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))]
40 pub public_endpoint_local: Option<Endpoint>,
41}
42
43impl fmt::Display for SpuSpec {
44 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
45 write!(
46 f,
47 "id: {}, type: {}, public: {}",
48 self.id, self.spu_type, self.public_endpoint
49 )
50 }
51}
52
53impl From<SpuId> for SpuSpec {
54 fn from(spec: SpuId) -> Self {
55 Self::new(spec)
56 }
57}
58
59impl SpuSpec {
60 pub fn new(id: SpuId) -> Self {
62 Self {
63 id,
64 ..Default::default()
65 }
66 }
67
68 pub fn new_public_addr(id: SpuId, port: u16, host: String) -> Self {
69 Self {
70 id,
71 public_endpoint: IngressPort {
72 port,
73 ingress: vec![IngressAddr::from_host(host)],
74 ..Default::default()
75 },
76 ..Default::default()
77 }
78 }
79
80 pub fn new_private_addr(id: SpuId, port: u16, host: String) -> Self {
81 Self {
82 id,
83 private_endpoint: Endpoint {
84 port,
85 host,
86 ..Default::default()
87 },
88 ..Default::default()
89 }
90 }
91
92 pub fn set_custom(mut self) -> Self {
93 self.spu_type = SpuType::Custom;
94 self
95 }
96
97 pub fn is_custom(&self) -> bool {
99 match self.spu_type {
100 SpuType::Managed => false,
101 SpuType::Custom => true,
102 }
103 }
104
105 pub fn private_server_address(&self) -> ServerAddress {
106 let private_ep = &self.private_endpoint;
107 ServerAddress {
108 host: private_ep.host.clone(),
109 port: private_ep.port,
110 }
111 }
112
113 pub fn update(&mut self, other: &Self) {
114 if self.rack != other.rack {
115 self.rack.clone_from(&other.rack);
116 }
117 if self.public_endpoint != other.public_endpoint {
118 self.public_endpoint = other.public_endpoint.clone();
119 }
120 if self.private_endpoint != other.private_endpoint {
121 self.private_endpoint = other.private_endpoint.clone();
122 }
123 }
124}
125
126#[derive(Decoder, Encoder, Debug, Clone, Default, Eq, PartialEq)]
129#[cfg_attr(
130 feature = "use_serde",
131 derive(serde::Serialize, serde::Deserialize),
132 serde(rename_all = "camelCase")
133)]
134pub struct CustomSpuSpec {
135 pub id: SpuId,
136 pub public_endpoint: IngressPort,
137 pub private_endpoint: Endpoint,
138 #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))]
139 pub rack: Option<String>,
140 #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))]
141 pub public_endpoint_local: Option<Endpoint>,
142}
143
144impl CustomSpuSpec {
145 pub const LABEL: &'static str = "CustomSpu";
146}
147
148impl From<CustomSpuSpec> for SpuSpec {
149 fn from(spec: CustomSpuSpec) -> Self {
150 Self {
151 id: spec.id,
152 public_endpoint: spec.public_endpoint,
153 private_endpoint: spec.private_endpoint,
154 rack: spec.rack,
155 spu_type: SpuType::Custom,
156 public_endpoint_local: spec.public_endpoint_local,
157 }
158 }
159}
160
161impl From<SpuSpec> for CustomSpuSpec {
162 fn from(spu: SpuSpec) -> Self {
163 match spu.spu_type {
164 SpuType::Custom => Self {
165 id: spu.id,
166 public_endpoint: spu.public_endpoint,
167 public_endpoint_local: spu.public_endpoint_local,
168 private_endpoint: spu.private_endpoint,
169 rack: spu.rack,
170 },
171 SpuType::Managed => panic!("managed spu type can't be converted into custom"),
172 }
173 }
174}
175
176#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
177#[cfg_attr(
178 feature = "use_serde",
179 derive(serde::Serialize, serde::Deserialize),
180 serde(rename_all = "camelCase", default)
181)]
182pub struct IngressPort {
183 pub port: u16,
184 pub ingress: Vec<IngressAddr>,
185 pub encryption: EncryptionEnum,
186}
187
188impl fmt::Display for IngressPort {
189 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
190 write!(f, "{}:{}", self.host_string(), self.port)
191 }
192}
193
194impl From<ServerAddress> for IngressPort {
195 fn from(addr: ServerAddress) -> Self {
196 Self {
197 port: addr.port,
198 ingress: vec![IngressAddr::from_host(addr.host)],
199 ..Default::default()
200 }
201 }
202}
203
204impl IngressPort {
205 pub fn from_port_host(port: u16, host: String) -> Self {
206 Self {
207 port,
208 ingress: vec![IngressAddr {
209 hostname: Some(host),
210 ip: None,
211 }],
212 encryption: EncryptionEnum::PLAINTEXT,
213 }
214 }
215
216 pub fn host(&self) -> Option<String> {
218 if self.ingress.is_empty() {
219 None
220 } else {
221 self.ingress[0].host()
222 }
223 }
224
225 pub fn host_string(&self) -> String {
226 match self.host() {
227 Some(host_val) => host_val,
228 None => "".to_owned(),
229 }
230 }
231
232 pub fn addr(&self) -> String {
234 format!("{}:{}", self.host_string(), self.port)
235 }
236}
237
238#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
239#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
240pub struct IngressAddr {
241 pub hostname: Option<String>,
242 pub ip: Option<String>,
243}
244
245impl IngressAddr {
246 pub fn from_host(hostname: String) -> Self {
247 Self {
248 hostname: Some(hostname),
249 ..Default::default()
250 }
251 }
252
253 pub fn from_ip(ip: String) -> Self {
254 Self {
255 ip: Some(ip),
256 ..Default::default()
257 }
258 }
259
260 pub fn host(&self) -> Option<String> {
261 self.hostname.clone().or_else(|| self.ip.clone())
262 }
263}
264
265#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
266#[cfg_attr(
267 feature = "use_serde",
268 derive(serde::Serialize, serde::Deserialize),
269 serde(rename_all = "camelCase")
270)]
271pub struct Endpoint {
272 pub port: u16,
273 pub host: String,
274 pub encryption: EncryptionEnum,
275}
276
277impl From<ServerAddress> for Endpoint {
278 fn from(addr: ServerAddress) -> Self {
279 Self {
280 port: addr.port,
281 host: addr.host,
282 ..Default::default()
283 }
284 }
285}
286
287impl fmt::Display for Endpoint {
288 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
289 write!(f, "{}:{}", self.host, self.port)
290 }
291}
292
293impl TryFrom<&Endpoint> for SocketEndPoint {
294 type Error = IoError;
295
296 fn try_from(endpoint: &Endpoint) -> Result<Self, Self::Error> {
297 flv_util::socket_helpers::host_port_to_socket_addr(&endpoint.host, endpoint.port).map(
298 |addr| SocketEndPoint {
299 addr,
300 encryption: EndPointEncryption::PLAINTEXT,
301 },
302 )
303 }
304}
305
306#[allow(dead_code)]
307impl TryFrom<&Endpoint> for std::net::SocketAddr {
308 type Error = IoError;
309
310 fn try_from(endpoint: &Endpoint) -> Result<Self, Self::Error> {
311 flv_util::socket_helpers::host_port_to_socket_addr(&endpoint.host, endpoint.port)
312 }
313}
314
315impl Default for Endpoint {
316 fn default() -> Self {
317 Self {
318 host: "127.0.0.1".to_owned(),
319 port: 0,
320 encryption: EncryptionEnum::default(),
321 }
322 }
323}
324
325impl Endpoint {
326 pub fn from_port_host(port: u16, host: String) -> Self {
327 Self {
328 port,
329 host,
330 encryption: EncryptionEnum::PLAINTEXT,
331 }
332 }
333}
334
335#[derive(Default, Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
336#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
337pub enum EncryptionEnum {
338 #[default]
339 #[fluvio(tag = 0)]
340 PLAINTEXT,
341 #[fluvio(tag = 1)]
342 SSL,
343}
344
345#[derive(Debug, Default, Clone, Eq, PartialEq, Encoder, Decoder)]
346#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
347#[derive()]
348pub enum SpuType {
349 #[default]
350 #[fluvio(tag = 0)]
351 Managed,
352 #[fluvio(tag = 1)]
353 Custom,
354}
355
356impl SpuType {
358 pub fn type_label(&self) -> &str {
359 match self {
360 Self::Managed => "managed",
361 Self::Custom => "custom",
362 }
363 }
364}
365
366impl fmt::Display for SpuType {
367 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
368 write!(f, "{:#?}", self.type_label())
369 }
370}
371
372#[derive(Debug)]
373pub enum CustomSpu {
374 Name(String),
375 Id(i32),
376}
377
378impl Default for CustomSpu {
382 fn default() -> CustomSpu {
383 Self::Name("".to_string())
384 }
385}
386
387impl Encoder for CustomSpu {
388 fn write_size(&self, version: Version) -> usize {
390 let type_size = (0u8).write_size(version);
391 match self {
392 Self::Name(name) => type_size + name.write_size(version),
393 Self::Id(id) => type_size + id.write_size(version),
394 }
395 }
396
397 fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), IoError>
399 where
400 T: BufMut,
401 {
402 if dest.remaining_mut() < self.write_size(version) {
404 return Err(IoError::new(
405 ErrorKind::UnexpectedEof,
406 format!(
407 "not enough capacity for custom spu len of {}",
408 self.write_size(version)
409 ),
410 ));
411 }
412
413 match self {
414 Self::Name(name) => {
415 let typ: u8 = 0;
416 typ.encode(dest, version)?;
417 name.encode(dest, version)?;
418 }
419 Self::Id(id) => {
420 let typ: u8 = 1;
421 typ.encode(dest, version)?;
422 id.encode(dest, version)?;
423 }
424 }
425
426 Ok(())
427 }
428}
429
430impl Decoder for CustomSpu {
431 fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), IoError>
432 where
433 T: Buf,
434 {
435 let mut value: u8 = 0;
436 value.decode(src, version)?;
437 match value {
438 0 => {
439 let mut name: String = String::default();
440 name.decode(src, version)?;
441 *self = Self::Name(name)
442 }
443 1 => {
444 let mut id: i32 = 0;
445 id.decode(src, version)?;
446 *self = Self::Id(id)
447 }
448 _ => {
449 return Err(IoError::new(
450 ErrorKind::UnexpectedEof,
451 format!("invalid value for Custom Spu: {value}"),
452 ));
453 }
454 }
455
456 Ok(())
457 }
458}