fluvio_sc_schema/objects/
metadata.rs

1use std::convert::{TryFrom, TryInto};
2use std::fmt::{Debug, Display};
3use std::io::{Error as IoError, Cursor};
4use std::io::ErrorKind;
5
6use anyhow::Result;
7
8use fluvio_protocol::{Encoder, Decoder, ByteBuf, Version};
9
10use fluvio_controlplane_metadata::store::MetadataStoreObject;
11use fluvio_controlplane_metadata::core::{MetadataContext, MetadataItem};
12use tracing::debug;
13
14use crate::AdminSpec;
15use crate::core::Spec;
16
17use super::DYN_OBJ;
18
19#[derive(Encoder, Decoder, Default, Clone, Debug)]
20#[cfg_attr(
21    feature = "use_serde",
22    derive(serde::Serialize, serde::Deserialize),
23    serde(bound(deserialize = "S: serde::de::DeserializeOwned")),
24    serde(rename_all = "camelCase")
25)]
26pub struct Metadata<S>
27where
28    S: Spec + Encoder + Decoder,
29    S::Status: Encoder + Decoder,
30{
31    pub name: String,
32    pub spec: S,
33    pub status: S::Status,
34}
35
36impl<S, C> From<MetadataStoreObject<S, C>> for Metadata<S>
37where
38    S: Spec + Encoder + Decoder,
39    S::IndexKey: ToString,
40    S::Status: Encoder + Decoder,
41    C: MetadataItem,
42{
43    fn from(meta: MetadataStoreObject<S, C>) -> Self {
44        Self {
45            name: meta.key.to_string(),
46            spec: meta.spec,
47            status: meta.status,
48        }
49    }
50}
51
52impl<S> Metadata<S>
53where
54    S: AdminSpec + Encoder + Decoder,
55    S::Status: Encoder + Decoder,
56{
57    pub fn summary(self) -> Self {
58        Self {
59            name: self.name,
60            spec: self.spec.summary(),
61            status: self.status,
62        }
63    }
64}
65
66impl<S, C> TryFrom<Metadata<S>> for MetadataStoreObject<S, C>
67where
68    S: Spec + Encoder + Decoder,
69    S::Status: Encoder + Decoder,
70    C: MetadataItem,
71    <S as Spec>::IndexKey: TryFrom<String>,
72    <<S as Spec>::IndexKey as TryFrom<String>>::Error: Display,
73{
74    type Error = IoError;
75
76    fn try_from(value: Metadata<S>) -> Result<Self, Self::Error> {
77        Ok(Self {
78            spec: value.spec,
79            status: value.status,
80            key: value.name.try_into().map_err(|err| {
81                IoError::new(ErrorKind::InvalidData, format!("problem converting: {err}"))
82            })?,
83            ctx: MetadataContext::default(),
84        })
85    }
86}
87
88/// Type encoded buffer, it uses type label to determine type
89#[derive(Debug, Default)]
90pub struct TypeBuffer {
91    version: Version,
92    ty: String,
93    buf: ByteBuf,
94}
95
96impl TypeBuffer {
97    // encode admin spec into a request
98    pub fn encode<S, I>(input: I, version: Version) -> Result<Self>
99    where
100        S: Spec,
101        I: Encoder,
102    {
103        let ty = S::LABEL.to_owned();
104        let mut buf = vec![];
105        input.encode(&mut buf, version)?;
106        Ok(Self {
107            version,
108            ty,
109            buf: ByteBuf::from(buf),
110        })
111    }
112
113    // check if this object is kind of spec
114    pub fn is_kind_of<S: Spec>(&self) -> bool {
115        self.ty == S::LABEL
116    }
117
118    // downcast to specific spec type and return object
119    // if doens't match to ty, return None
120    pub fn downcast<S, O>(&self) -> Result<Option<O>>
121    where
122        S: Spec,
123        O: Decoder + Debug,
124    {
125        if self.is_kind_of::<S>() {
126            debug!(ty = S::LABEL, "downcasting");
127            let mut buf = Cursor::new(self.buf.as_ref());
128            Ok(Some(O::decode_from(&mut buf, self.version)?))
129        } else {
130            debug!(target_ty = S::LABEL, source_t = self.ty, "downcast failed");
131            Ok(None)
132        }
133    }
134
135    pub(crate) fn set_buf(&mut self, version: Version, ty: String, buf: ByteBuf) {
136        self.buf = buf;
137        self.ty = ty;
138        self.version = version;
139    }
140}
141
142impl Encoder for TypeBuffer {
143    fn write_size(&self, version: Version) -> usize {
144        self.ty.write_size(version)
145            + self.buf.len()
146            + (if version >= DYN_OBJ {
147                let u32 = 0;
148                u32.write_size(version)
149            } else {
150                0
151            })
152    }
153
154    fn encode<T>(&self, dest: &mut T, version: Version) -> std::result::Result<(), IoError>
155    where
156        T: fluvio_protocol::bytes::BufMut,
157    {
158        self.ty.encode(dest, version)?;
159        if version >= DYN_OBJ {
160            let len: u32 = self.buf.len() as u32;
161            len.encode(dest, version)?; // write len
162            debug!(len, "encoding using new with");
163        } else {
164            debug!(len = self.buf.len(), "encoding using old with len");
165        }
166        dest.put(self.buf.as_ref());
167
168        Ok(())
169    }
170}
171
172// this is always using new protocol, classical decoding is done before by caller
173impl Decoder for TypeBuffer {
174    fn decode<T>(&mut self, src: &mut T, version: Version) -> std::result::Result<(), IoError>
175    where
176        T: fluvio_protocol::bytes::Buf,
177    {
178        debug!("decoding tybuffer using new protocol");
179        self.ty.decode(src, version)?;
180        debug!(ty = self.ty, "decoded type");
181        let mut len: u32 = 0;
182        len.decode(src, version)?;
183        tracing::trace!(len, "decoded len");
184        debug!(len, "copy bytes");
185        if src.remaining() < len as usize {
186            return Err(IoError::new(
187                ErrorKind::UnexpectedEof,
188                format!(
189                    "not enough bytes, need: {}, remaining: {}",
190                    len,
191                    src.remaining()
192                ),
193            ));
194        }
195        self.version = version;
196        self.buf = src.copy_to_bytes(len as usize).into();
197
198        Ok(())
199    }
200}