fluvio_sc_schema/objects/
create.rs1use std::fmt::Debug;
2
3use anyhow::Result;
4
5use fluvio_protocol::{Encoder, Decoder, Version};
6use fluvio_protocol::api::Request;
7
8use crate::{AdminPublicApiKey, CreatableAdminSpec, Status, TryEncodableFrom};
9
10use super::COMMON_VERSION;
11
12#[derive(Encoder, Decoder, Default, Debug, Clone)]
13pub struct CommonCreateRequest {
14 pub name: String,
15 pub dry_run: bool,
16 #[fluvio(min_version = 7)]
17 pub timeout: Option<u32>, }
19
20#[derive(Encoder, Decoder, Default, Debug, Clone)]
22pub struct CreateRequest<S> {
23 pub common: CommonCreateRequest,
24 pub request: S,
25}
26
27impl<S> CreateRequest<S> {
28 pub fn new(common: CommonCreateRequest, request: S) -> Self {
29 Self { common, request }
30 }
31
32 pub fn parts(self) -> (CommonCreateRequest, S) {
34 (self.common, self.request)
35 }
36}
37
38#[derive(Debug, Default, Encoder)]
39pub struct ObjectApiCreateRequest(CreateTypeBuffer); impl Request for ObjectApiCreateRequest {
42 const API_KEY: u16 = AdminPublicApiKey::Create as u16;
43 const MIN_API_VERSION: i16 = 9;
44 const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
45 type Response = Status;
46}
47
48impl<S> TryEncodableFrom<CreateRequest<S>> for ObjectApiCreateRequest
49where
50 CreateRequest<S>: Encoder + Decoder + Debug,
51 S: CreatableAdminSpec,
52{
53 fn try_encode_from(input: CreateRequest<S>, version: Version) -> Result<Self> {
54 Ok(Self(CreateTypeBuffer::encode(input, version)?))
55 }
56
57 fn downcast(&self) -> Result<Option<CreateRequest<S>>> {
58 self.0.downcast::<S>()
59 }
60}
61
62use classic::*;
63
64mod classic {
66
67 use std::io::{Error as IoError, ErrorKind, Cursor};
68 use std::fmt::Debug;
69
70 use anyhow::Result;
71
72 use fluvio_controlplane_metadata::core::Spec;
73 use fluvio_protocol::{Decoder, ByteBuf, Version, Encoder};
74 use tracing::debug;
75
76 use crate::CreatableAdminSpec;
77 use crate::objects::classic::{ClassicCreatableAdminSpec, ClassicObjectApiCreateRequest};
78 use crate::objects::{COMMON_VERSION, DYN_OBJ};
79
80 use super::{ObjectApiCreateRequest, CreateRequest};
81
82 impl Decoder for ObjectApiCreateRequest {
85 fn decode<T>(
86 &mut self,
87 src: &mut T,
88 version: fluvio_protocol::Version,
89 ) -> Result<(), std::io::Error>
90 where
91 T: fluvio_protocol::bytes::Buf,
92 {
93 if version >= crate::objects::DYN_OBJ {
94 debug!("decoding new");
95 self.0.decode(src, version)?;
96 } else {
97 debug!("decoding classical");
98
99 let classic_obj = ClassicObjectApiCreateRequest::decode_from(src, version)?;
100 let ty = classic_obj.request.type_string();
101 self.0.set_buf(
103 version,
104 ty.to_owned(),
105 classic_obj.as_bytes(COMMON_VERSION)?.into(),
106 );
107 }
108 Ok(())
109 }
110 }
111
112 #[derive(Debug, Default)]
115 pub(crate) struct CreateTypeBuffer {
116 version: Version,
117 ty: String,
118 buf: ByteBuf, }
120
121 impl CreateTypeBuffer {
122 pub(crate) fn encode<S>(input: CreateRequest<S>, version: Version) -> Result<Self>
124 where
125 S: CreatableAdminSpec,
126 {
127 let ty = S::LABEL.to_owned();
128 let mut buf = vec![];
129 if version >= DYN_OBJ {
130 input.encode(&mut buf, version)?;
131 } else {
132 debug!("encoding classic");
133 let parts = input.parts();
135 let request = <S as ClassicCreatableAdminSpec>::try_classic_convert(parts.1)?;
136 let create_api_request = ClassicObjectApiCreateRequest {
137 common: parts.0,
138 request,
139 };
140 create_api_request.encode(&mut buf, version)?;
141 }
142 Ok(Self {
143 version,
144 ty,
145 buf: ByteBuf::from(buf),
146 })
147 }
148
149 pub fn is_kind_of<S: Spec>(&self) -> bool {
151 self.ty == S::LABEL
152 }
153
154 pub fn downcast<S>(&self) -> Result<Option<CreateRequest<S>>>
157 where
158 S: CreatableAdminSpec,
159 {
160 if self.is_kind_of::<S>() {
161 debug!(ty = S::LABEL, "downcast kind");
162 let mut buf = Cursor::new(self.buf.as_ref());
163 if self.version < DYN_OBJ {
164 let classic_obj =
165 ClassicObjectApiCreateRequest::decode_from(&mut buf, self.version)?;
166 let ClassicObjectApiCreateRequest { common, request } = classic_obj;
167 let new_request =
168 match <S as ClassicCreatableAdminSpec>::try_convert_from_classic(request) {
169 Some(new_request) => new_request,
170 None => return Ok(None),
171 };
172 Ok(Some(CreateRequest::new(common, new_request)))
173 } else {
174 Ok(Some(CreateRequest::<S>::decode_from(
175 &mut buf,
176 self.version,
177 )?))
178 }
179 } else {
180 debug!(target_ty = S::LABEL, my_ty = self.ty, "different kind");
181 Ok(None)
182 }
183 }
184
185 pub(crate) fn set_buf(&mut self, version: Version, ty: String, buf: ByteBuf) {
186 self.buf = buf;
187 self.ty = ty;
188 self.version = version;
189 }
190 }
191
192 impl Encoder for CreateTypeBuffer {
193 fn write_size(&self, version: Version) -> usize {
194 if version >= DYN_OBJ {
195 self.ty.write_size(version) + 0_u32.write_size(version) + self.buf.len()
196 } else {
197 self.buf.len()
198 }
199 }
200
201 fn encode<T>(&self, dest: &mut T, version: Version) -> std::result::Result<(), IoError>
202 where
203 T: fluvio_protocol::bytes::BufMut,
204 {
205 if version >= DYN_OBJ {
206 self.ty.encode(dest, version)?;
207 let len: u32 = self.buf.len() as u32;
208 len.encode(dest, version)?; debug!(len, "encoding using new");
210 } else {
211 debug!(len = self.buf.len(), "encoding using old protocol");
212 }
213 dest.put(self.buf.as_ref());
214
215 Ok(())
216 }
217 }
218
219 impl Decoder for CreateTypeBuffer {
222 fn decode<T>(&mut self, src: &mut T, version: Version) -> std::result::Result<(), IoError>
223 where
224 T: fluvio_protocol::bytes::Buf,
225 {
226 debug!("decoding tybuffer using new protocol");
227 self.ty.decode(src, version)?;
228 tracing::trace!(ty = self.ty, "decoded type");
229 debug!(ty = self.ty, "decoded type");
230
231 let mut len: u32 = 0;
232 len.decode(src, version)?;
233 tracing::trace!(len, "decoded len");
234 debug!(len, "copy bytes");
235 if src.remaining() < len as usize {
236 return Err(IoError::new(
237 ErrorKind::UnexpectedEof,
238 format!(
239 "not enough bytes, need: {}, remaining: {}",
240 len,
241 src.remaining()
242 ),
243 ));
244 }
245 self.buf = src.copy_to_bytes(len as usize).into();
246 self.version = version;
247
248 Ok(())
249 }
250 }
251}