fluvio_sc_schema/objects/
metadata.rs1use std::convert::{TryFrom, TryInto};
2use std::fmt::{Debug, Display};
3use std::io::{Error as IoError, Cursor};
4use std::io::ErrorKind;
5
6use anyhow::Result;
7
8use fluvio_protocol::{Encoder, Decoder, ByteBuf, Version};
9
10use fluvio_controlplane_metadata::store::MetadataStoreObject;
11use fluvio_controlplane_metadata::core::{MetadataContext, MetadataItem};
12use tracing::debug;
13
14use crate::AdminSpec;
15use crate::core::Spec;
16
17use super::DYN_OBJ;
18
19#[derive(Encoder, Decoder, Default, Clone, Debug)]
20#[cfg_attr(
21 feature = "use_serde",
22 derive(serde::Serialize, serde::Deserialize),
23 serde(bound(deserialize = "S: serde::de::DeserializeOwned")),
24 serde(rename_all = "camelCase")
25)]
26pub struct Metadata<S>
27where
28 S: Spec + Encoder + Decoder,
29 S::Status: Encoder + Decoder,
30{
31 pub name: String,
32 pub spec: S,
33 pub status: S::Status,
34}
35
36impl<S, C> From<MetadataStoreObject<S, C>> for Metadata<S>
37where
38 S: Spec + Encoder + Decoder,
39 S::IndexKey: ToString,
40 S::Status: Encoder + Decoder,
41 C: MetadataItem,
42{
43 fn from(meta: MetadataStoreObject<S, C>) -> Self {
44 Self {
45 name: meta.key.to_string(),
46 spec: meta.spec,
47 status: meta.status,
48 }
49 }
50}
51
52impl<S> Metadata<S>
53where
54 S: AdminSpec + Encoder + Decoder,
55 S::Status: Encoder + Decoder,
56{
57 pub fn summary(self) -> Self {
58 Self {
59 name: self.name,
60 spec: self.spec.summary(),
61 status: self.status,
62 }
63 }
64}
65
66impl<S, C> TryFrom<Metadata<S>> for MetadataStoreObject<S, C>
67where
68 S: Spec + Encoder + Decoder,
69 S::Status: Encoder + Decoder,
70 C: MetadataItem,
71 <S as Spec>::IndexKey: TryFrom<String>,
72 <<S as Spec>::IndexKey as TryFrom<String>>::Error: Display,
73{
74 type Error = IoError;
75
76 fn try_from(value: Metadata<S>) -> Result<Self, Self::Error> {
77 Ok(Self {
78 spec: value.spec,
79 status: value.status,
80 key: value.name.try_into().map_err(|err| {
81 IoError::new(ErrorKind::InvalidData, format!("problem converting: {err}"))
82 })?,
83 ctx: MetadataContext::default(),
84 })
85 }
86}
87
88#[derive(Debug, Default)]
90pub struct TypeBuffer {
91 version: Version,
92 ty: String,
93 buf: ByteBuf,
94}
95
96impl TypeBuffer {
97 pub fn encode<S, I>(input: I, version: Version) -> Result<Self>
99 where
100 S: Spec,
101 I: Encoder,
102 {
103 let ty = S::LABEL.to_owned();
104 let mut buf = vec![];
105 input.encode(&mut buf, version)?;
106 Ok(Self {
107 version,
108 ty,
109 buf: ByteBuf::from(buf),
110 })
111 }
112
113 pub fn is_kind_of<S: Spec>(&self) -> bool {
115 self.ty == S::LABEL
116 }
117
118 pub fn downcast<S, O>(&self) -> Result<Option<O>>
121 where
122 S: Spec,
123 O: Decoder + Debug,
124 {
125 if self.is_kind_of::<S>() {
126 debug!(ty = S::LABEL, "downcasting");
127 let mut buf = Cursor::new(self.buf.as_ref());
128 Ok(Some(O::decode_from(&mut buf, self.version)?))
129 } else {
130 debug!(target_ty = S::LABEL, source_t = self.ty, "downcast failed");
131 Ok(None)
132 }
133 }
134
135 pub(crate) fn set_buf(&mut self, version: Version, ty: String, buf: ByteBuf) {
136 self.buf = buf;
137 self.ty = ty;
138 self.version = version;
139 }
140}
141
142impl Encoder for TypeBuffer {
143 fn write_size(&self, version: Version) -> usize {
144 self.ty.write_size(version)
145 + self.buf.len()
146 + (if version >= DYN_OBJ {
147 let u32 = 0;
148 u32.write_size(version)
149 } else {
150 0
151 })
152 }
153
154 fn encode<T>(&self, dest: &mut T, version: Version) -> std::result::Result<(), IoError>
155 where
156 T: fluvio_protocol::bytes::BufMut,
157 {
158 self.ty.encode(dest, version)?;
159 if version >= DYN_OBJ {
160 let len: u32 = self.buf.len() as u32;
161 len.encode(dest, version)?; debug!(len, "encoding using new with");
163 } else {
164 debug!(len = self.buf.len(), "encoding using old with len");
165 }
166 dest.put(self.buf.as_ref());
167
168 Ok(())
169 }
170}
171
172impl Decoder for TypeBuffer {
174 fn decode<T>(&mut self, src: &mut T, version: Version) -> std::result::Result<(), IoError>
175 where
176 T: fluvio_protocol::bytes::Buf,
177 {
178 debug!("decoding tybuffer using new protocol");
179 self.ty.decode(src, version)?;
180 debug!(ty = self.ty, "decoded type");
181 let mut len: u32 = 0;
182 len.decode(src, version)?;
183 tracing::trace!(len, "decoded len");
184 debug!(len, "copy bytes");
185 if src.remaining() < len as usize {
186 return Err(IoError::new(
187 ErrorKind::UnexpectedEof,
188 format!(
189 "not enough bytes, need: {}, remaining: {}",
190 len,
191 src.remaining()
192 ),
193 ));
194 }
195 self.version = version;
196 self.buf = src.copy_to_bytes(len as usize).into();
197
198 Ok(())
199 }
200}