1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
//!
//! # API Requests
//!
//! Maps SC Api Requests with their associated Responses.
//!

use std::convert::{TryInto};
use std::io::Error as IoError;
use std::fmt::Debug;

use tracing::{debug};

use fluvio_protocol::bytes::{Buf};
use fluvio_protocol::api::{ApiMessage, RequestHeader, RequestMessage};
use fluvio_protocol::api::api_decode;
use fluvio_protocol::core::{Decoder};
use fluvio_protocol::link::versions::ApiVersionsRequest;

use crate::mirroring::ObjectMirroringRequest;
use crate::AdminPublicApiKey;
use crate::objects::{
    ObjectApiCreateRequest, ObjectApiDeleteRequest, ObjectApiListRequest, ObjectApiWatchRequest,
};

/// Non generic AdminRequest, This is typically used Decoding
#[derive(Debug)]
pub enum AdminPublicDecodedRequest {
    ApiVersionsRequest(RequestMessage<ApiVersionsRequest>),
    CreateRequest(Box<RequestMessage<ObjectApiCreateRequest>>),
    DeleteRequest(RequestMessage<ObjectApiDeleteRequest>),
    ListRequest(RequestMessage<ObjectApiListRequest>),
    WatchRequest(RequestMessage<ObjectApiWatchRequest>),
    MirroringRequest(RequestMessage<ObjectMirroringRequest>),
}

impl Default for AdminPublicDecodedRequest {
    fn default() -> Self {
        Self::ApiVersionsRequest(RequestMessage::<ApiVersionsRequest>::default())
    }
}

impl ApiMessage for AdminPublicDecodedRequest {
    type ApiKey = AdminPublicApiKey;

    fn decode_with_header<T>(_src: &mut T, _header: RequestHeader) -> Result<Self, IoError>
    where
        Self: Default + Sized,
        Self::ApiKey: Sized,
        T: Buf,
    {
        panic!("not needed")
    }

    fn decode_from<T>(src: &mut T) -> Result<Self, IoError>
    where
        T: Buf,
    {
        let header = RequestHeader::decode_from(src, 0)?;
        let version = header.api_version();
        let api_key = header.api_key().try_into()?;
        debug!(
            "decoding admin public request from: {} api: {:#?}",
            header.client_id(),
            api_key
        );
        match api_key {
            AdminPublicApiKey::ApiVersion => api_decode!(Self, ApiVersionsRequest, src, header),
            AdminPublicApiKey::Create => Ok(Self::CreateRequest(Box::new(RequestMessage::new(
                header,
                ObjectApiCreateRequest::decode_from(src, version)?,
            )))),
            AdminPublicApiKey::Delete => Ok(Self::DeleteRequest(RequestMessage::new(
                header,
                ObjectApiDeleteRequest::decode_from(src, version)?,
            ))),

            AdminPublicApiKey::List => Ok(Self::ListRequest(RequestMessage::new(
                header,
                ObjectApiListRequest::decode_from(src, version)?,
            ))),

            AdminPublicApiKey::Watch => Ok(Self::WatchRequest(RequestMessage::new(
                header,
                ObjectApiWatchRequest::decode_from(src, version)?,
            ))),
            AdminPublicApiKey::Mirroring => Ok(Self::MirroringRequest(RequestMessage::new(
                header,
                ObjectMirroringRequest::decode_from(src, version)?,
            ))),
        }
    }
}

#[cfg(test)]
mod test {

    use std::io::Cursor;

    use fluvio_controlplane_metadata::topic::TopicSpec;
    use fluvio_protocol::api::RequestMessage;
    use fluvio_protocol::{Encoder};
    use fluvio_protocol::api::ApiMessage;

    use crate::objects::{ObjectApiListRequest, ListRequest, COMMON_VERSION};
    use crate::{AdminPublicDecodedRequest, TryEncodableFrom};

    fn create_req() -> ObjectApiListRequest {
        let list_request: ListRequest<TopicSpec> = ListRequest::new(vec![], false);
        ObjectApiListRequest::try_encode_from(list_request, COMMON_VERSION).expect("encode")
    }

    #[test]
    fn test_list_encode_decoding() {
        use fluvio_protocol::api::Request;

        let list_req = create_req();

        let mut req_msg = RequestMessage::new_request(list_req);
        req_msg
            .get_mut_header()
            .set_client_id("test")
            .set_api_version(ObjectApiListRequest::API_KEY as i16);

        let mut src = vec![];
        req_msg.encode(&mut src, 0).expect("encoding");

        let dec_req: AdminPublicDecodedRequest =
            AdminPublicDecodedRequest::decode_from(&mut Cursor::new(&src)).expect("decode");

        assert!(matches!(dec_req, AdminPublicDecodedRequest::ListRequest(_)));
    }
}