fluvio_sc_schema/objects/
create.rs

1use std::fmt::Debug;
2
3use anyhow::Result;
4
5use fluvio_protocol::{Encoder, Decoder, Version};
6use fluvio_protocol::api::Request;
7
8use crate::{AdminPublicApiKey, CreatableAdminSpec, Status, TryEncodableFrom};
9
10use super::COMMON_VERSION;
11
12#[derive(Encoder, Decoder, Default, Debug, Clone)]
13pub struct CommonCreateRequest {
14    pub name: String,
15    pub dry_run: bool,
16    #[fluvio(min_version = 7)]
17    pub timeout: Option<u32>, // timeout in milliseconds
18}
19
20/// Every create request must have this parameters
21#[derive(Encoder, Decoder, Default, Debug, Clone)]
22pub struct CreateRequest<S> {
23    pub common: CommonCreateRequest,
24    pub request: S,
25}
26
27impl<S> CreateRequest<S> {
28    pub fn new(common: CommonCreateRequest, request: S) -> Self {
29        Self { common, request }
30    }
31
32    /// deconstruct
33    pub fn parts(self) -> (CommonCreateRequest, S) {
34        (self.common, self.request)
35    }
36}
37
38#[derive(Debug, Default, Encoder)]
39pub struct ObjectApiCreateRequest(CreateTypeBuffer); // replace with CreateTypeBuffer with TypeBuffer after
40
41impl Request for ObjectApiCreateRequest {
42    const API_KEY: u16 = AdminPublicApiKey::Create as u16;
43    const MIN_API_VERSION: i16 = 9;
44    const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
45    type Response = Status;
46}
47
48impl<S> TryEncodableFrom<CreateRequest<S>> for ObjectApiCreateRequest
49where
50    CreateRequest<S>: Encoder + Decoder + Debug,
51    S: CreatableAdminSpec,
52{
53    fn try_encode_from(input: CreateRequest<S>, version: Version) -> Result<Self> {
54        Ok(Self(CreateTypeBuffer::encode(input, version)?))
55    }
56
57    fn downcast(&self) -> Result<Option<CreateRequest<S>>> {
58        self.0.downcast::<S>()
59    }
60}
61
62use classic::*;
63
64// backward compatibility with classic protocol. this should go away once we deprecate classic
65mod classic {
66
67    use std::io::{Error as IoError, ErrorKind, Cursor};
68    use std::fmt::Debug;
69
70    use anyhow::Result;
71
72    use fluvio_controlplane_metadata::core::Spec;
73    use fluvio_protocol::{Decoder, ByteBuf, Version, Encoder};
74    use tracing::debug;
75
76    use crate::CreatableAdminSpec;
77    use crate::objects::classic::{ClassicCreatableAdminSpec, ClassicObjectApiCreateRequest};
78    use crate::objects::{COMMON_VERSION, DYN_OBJ};
79
80    use super::{ObjectApiCreateRequest, CreateRequest};
81
82    // This sections for compatibility with classic protocol, should go away once we deprecate classic
83
84    impl Decoder for ObjectApiCreateRequest {
85        fn decode<T>(
86            &mut self,
87            src: &mut T,
88            version: fluvio_protocol::Version,
89        ) -> Result<(), std::io::Error>
90        where
91            T: fluvio_protocol::bytes::Buf,
92        {
93            if version >= crate::objects::DYN_OBJ {
94                debug!("decoding new");
95                self.0.decode(src, version)?;
96            } else {
97                debug!("decoding classical");
98
99                let classic_obj = ClassicObjectApiCreateRequest::decode_from(src, version)?;
100                let ty = classic_obj.request.type_string();
101                // reencode using new version
102                self.0.set_buf(
103                    version,
104                    ty.to_owned(),
105                    classic_obj.as_bytes(COMMON_VERSION)?.into(),
106                );
107            }
108            Ok(())
109        }
110    }
111
112    /// This is same as TypeBuffer, but need to have for create because
113    /// classic protocol treated differently.  Once classic protocol is deprecated, we can remove this
114    #[derive(Debug, Default)]
115    pub(crate) struct CreateTypeBuffer {
116        version: Version,
117        ty: String,
118        buf: ByteBuf, // for classical, we stored in the old way
119    }
120
121    impl CreateTypeBuffer {
122        // since this is create, we can specialize it
123        pub(crate) fn encode<S>(input: CreateRequest<S>, version: Version) -> Result<Self>
124        where
125            S: CreatableAdminSpec,
126        {
127            let ty = S::LABEL.to_owned();
128            let mut buf = vec![];
129            if version >= DYN_OBJ {
130                input.encode(&mut buf, version)?;
131            } else {
132                debug!("encoding classic");
133                // for classical, we use old way
134                let parts = input.parts();
135                let request = <S as ClassicCreatableAdminSpec>::try_classic_convert(parts.1)?;
136                let create_api_request = ClassicObjectApiCreateRequest {
137                    common: parts.0,
138                    request,
139                };
140                create_api_request.encode(&mut buf, version)?;
141            }
142            Ok(Self {
143                version,
144                ty,
145                buf: ByteBuf::from(buf),
146            })
147        }
148
149        // check if this object is kind of spec
150        pub fn is_kind_of<S: Spec>(&self) -> bool {
151            self.ty == S::LABEL
152        }
153
154        // downcast to specific spec type and return object
155        // if doens't match to ty, return None
156        pub fn downcast<S>(&self) -> Result<Option<CreateRequest<S>>>
157        where
158            S: CreatableAdminSpec,
159        {
160            if self.is_kind_of::<S>() {
161                debug!(ty = S::LABEL, "downcast kind");
162                let mut buf = Cursor::new(self.buf.as_ref());
163                if self.version < DYN_OBJ {
164                    let classic_obj =
165                        ClassicObjectApiCreateRequest::decode_from(&mut buf, self.version)?;
166                    let ClassicObjectApiCreateRequest { common, request } = classic_obj;
167                    let new_request =
168                        match <S as ClassicCreatableAdminSpec>::try_convert_from_classic(request) {
169                            Some(new_request) => new_request,
170                            None => return Ok(None),
171                        };
172                    Ok(Some(CreateRequest::new(common, new_request)))
173                } else {
174                    Ok(Some(CreateRequest::<S>::decode_from(
175                        &mut buf,
176                        self.version,
177                    )?))
178                }
179            } else {
180                debug!(target_ty = S::LABEL, my_ty = self.ty, "different kind");
181                Ok(None)
182            }
183        }
184
185        pub(crate) fn set_buf(&mut self, version: Version, ty: String, buf: ByteBuf) {
186            self.buf = buf;
187            self.ty = ty;
188            self.version = version;
189        }
190    }
191
192    impl Encoder for CreateTypeBuffer {
193        fn write_size(&self, version: Version) -> usize {
194            if version >= DYN_OBJ {
195                self.ty.write_size(version) + 0_u32.write_size(version) + self.buf.len()
196            } else {
197                self.buf.len()
198            }
199        }
200
201        fn encode<T>(&self, dest: &mut T, version: Version) -> std::result::Result<(), IoError>
202        where
203            T: fluvio_protocol::bytes::BufMut,
204        {
205            if version >= DYN_OBJ {
206                self.ty.encode(dest, version)?;
207                let len: u32 = self.buf.len() as u32;
208                len.encode(dest, version)?; // write len
209                debug!(len, "encoding using new");
210            } else {
211                debug!(len = self.buf.len(), "encoding using old protocol");
212            }
213            dest.put(self.buf.as_ref());
214
215            Ok(())
216        }
217    }
218
219    // this is always using new protocol, classical decoding is done before by caller
220
221    impl Decoder for CreateTypeBuffer {
222        fn decode<T>(&mut self, src: &mut T, version: Version) -> std::result::Result<(), IoError>
223        where
224            T: fluvio_protocol::bytes::Buf,
225        {
226            debug!("decoding tybuffer using new protocol");
227            self.ty.decode(src, version)?;
228            tracing::trace!(ty = self.ty, "decoded type");
229            debug!(ty = self.ty, "decoded type");
230
231            let mut len: u32 = 0;
232            len.decode(src, version)?;
233            tracing::trace!(len, "decoded len");
234            debug!(len, "copy bytes");
235            if src.remaining() < len as usize {
236                return Err(IoError::new(
237                    ErrorKind::UnexpectedEof,
238                    format!(
239                        "not enough bytes, need: {}, remaining: {}",
240                        len,
241                        src.remaining()
242                    ),
243                ));
244            }
245            self.buf = src.copy_to_bytes(len as usize).into();
246            self.version = version;
247
248            Ok(())
249        }
250    }
251}