1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
#![allow(clippy::assign_op_pattern)]

use std::fmt::Debug;

use dataplane::core::{Encoder, Decoder};
use dataplane::api::Request;
use fluvio_controlplane_metadata::derivedstream::DerivedStreamSpec;
use fluvio_protocol::Version;
use crate::topic::TopicSpec;
use crate::customspu::CustomSpuSpec;
use crate::smartmodule::SmartModuleSpec;
use crate::tableformat::TableFormatSpec;
use crate::spg::SpuGroupSpec;
use crate::connector::ManagedConnectorSpec;

use crate::{AdminPublicApiKey, CreatableAdminSpec, Status};

#[derive(Encoder, Decoder, Default, Debug)]
pub struct CreateRequest<S: CreatableAdminSpec> {
    pub request: S,
}

/// Every create request must have this parameters
#[derive(Encoder, Decoder, Default, Debug)]
pub struct CommonCreateRequest {
    pub name: String,
    pub dry_run: bool,
}

impl Request for ObjectApiCreateRequest {
    const API_KEY: u16 = AdminPublicApiKey::Create as u16;
    const DEFAULT_API_VERSION: i16 = 3;
    type Response = Status;
}

#[derive(Debug, Default, Encoder, Decoder)]
pub struct ObjectApiCreateRequest {
    pub common: CommonCreateRequest,
    pub request: ObjectCreateRequest,
}

#[derive(Debug)]
pub enum ObjectCreateRequest {
    Topic(TopicSpec),
    CustomSpu(CustomSpuSpec),
    SmartModule(SmartModuleSpec),
    ManagedConnector(ManagedConnectorSpec),
    SpuGroup(SpuGroupSpec),
    TableFormat(TableFormatSpec),
    DerivedStream(DerivedStreamSpec),
}

impl Default for ObjectCreateRequest {
    fn default() -> Self {
        Self::Topic(TopicSpec::default())
    }
}

impl ObjectCreateRequest {
    fn type_value(&self) -> u8 {
        match self {
            Self::Topic(_) => TopicSpec::CREATE_TYPE,
            Self::CustomSpu(_) => CustomSpuSpec::CREATE_TYPE,
            Self::SmartModule(_) => SmartModuleSpec::CREATE_TYPE,
            Self::ManagedConnector(_) => ManagedConnectorSpec::CREATE_TYPE,
            Self::SpuGroup(_) => SpuGroupSpec::CREATE_TYPE,
            Self::TableFormat(_) => TableFormatSpec::CREATE_TYPE,
            Self::DerivedStream(_) => DerivedStreamSpec::CREATE_TYPE,
        }
    }
}

impl Encoder for ObjectCreateRequest {
    fn write_size(&self, version: dataplane::core::Version) -> usize {
        let type_size = (0u8).write_size(version);

        type_size
            + match self {
                Self::Topic(s) => s.write_size(version),
                Self::CustomSpu(s) => s.write_size(version),
                Self::SmartModule(s) => s.write_size(version),
                Self::ManagedConnector(s) => s.write_size(version),
                Self::SpuGroup(s) => s.write_size(version),
                Self::TableFormat(s) => s.write_size(version),
                Self::DerivedStream(s) => s.write_size(version),
            }
    }

    fn encode<T>(
        &self,
        dest: &mut T,
        version: dataplane::core::Version,
    ) -> Result<(), std::io::Error>
    where
        T: dataplane::bytes::BufMut,
    {
        self.type_value().encode(dest, version)?;
        match self {
            Self::Topic(s) => s.encode(dest, version)?,
            Self::CustomSpu(s) => s.encode(dest, version)?,
            Self::ManagedConnector(s) => s.encode(dest, version)?,
            Self::SmartModule(s) => s.encode(dest, version)?,
            Self::SpuGroup(s) => s.encode(dest, version)?,
            Self::TableFormat(s) => s.encode(dest, version)?,
            Self::DerivedStream(s) => s.encode(dest, version)?,
        }

        Ok(())
    }
}

// We implement decode signature even thought this will be never called.
// RequestMessage use decode_object.  But in order to provide backward compatibility, we pretend
// to provide decode implementation but shoudl be never called
impl dataplane::core::Decoder for ObjectCreateRequest {
    fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), std::io::Error>
    where
        T: dataplane::bytes::Buf,
    {
        let mut typ: u8 = 0;
        typ.decode(src, version)?;
        tracing::trace!("decoded type: {}", typ);

        match typ {
            TopicSpec::CREATE_TYPE => {
                tracing::trace!("detected topic");
                let mut request = TopicSpec::default();
                request.decode(src, version)?;
                *self = Self::Topic(request);
                Ok(())
            }

            TableFormatSpec::CREATE_TYPE => {
                tracing::trace!("detected table");
                let mut request = TableFormatSpec::default();
                request.decode(src, version)?;
                *self = Self::TableFormat(request);
                Ok(())
            }

            CustomSpuSpec::CREATE_TYPE => {
                tracing::trace!("detected custom spu");
                let mut request = CustomSpuSpec::default();
                request.decode(src, version)?;
                *self = Self::CustomSpu(request);
                Ok(())
            }

            SpuGroupSpec::CREATE_TYPE => {
                tracing::trace!("detected custom spu");
                let mut request = SpuGroupSpec::default();
                request.decode(src, version)?;
                *self = Self::SpuGroup(request);
                Ok(())
            }

            SmartModuleSpec::CREATE_TYPE => {
                tracing::trace!("detected smartmodule");
                let mut request = SmartModuleSpec::default();
                request.decode(src, version)?;
                *self = Self::SmartModule(request);
                Ok(())
            }

            ManagedConnectorSpec::CREATE_TYPE => {
                tracing::trace!("detected connector");
                let mut request = ManagedConnectorSpec::default();
                request.decode(src, version)?;
                *self = Self::ManagedConnector(request);
                Ok(())
            }

            DerivedStreamSpec::CREATE_TYPE => {
                tracing::trace!("detected derivedstream");
                let mut request = DerivedStreamSpec::default();
                request.decode(src, version)?;
                *self = Self::DerivedStream(request);
                Ok(())
            }

            // Unexpected type
            _ => Err(std::io::Error::new(
                std::io::ErrorKind::InvalidData,
                format!("invalid create type {:#?}", typ),
            )),
        }
    }
}

/// Macro to convert create request
/// impl From<(CommonCreateRequest TopicSpec)> for ObjectApiCreateRequest {
/// fn from(req: (CommonCreateRequest TopicSpec)) -> Self {
///       ObjectApiCreateRequest {
///           common: req.0,
///           request: req.1
///       }
/// }
/// ObjectFrom!(WatchRequest, Topic);

macro_rules! CreateFrom {
    ($create:ty,$specTy:ident) => {
        impl From<(crate::objects::CommonCreateRequest, $create)>
            for crate::objects::ObjectApiCreateRequest
        {
            fn from(fr: (crate::objects::CommonCreateRequest, $create)) -> Self {
                crate::objects::ObjectApiCreateRequest {
                    common: fr.0,
                    request: crate::objects::ObjectCreateRequest::$specTy(fr.1),
                }
            }
        }
    };
}

pub(crate) use CreateFrom;