1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use std::fmt::Debug;

use fluvio_stream_model::core::{Spec, Status};

use fluvio_protocol::{Decoder, Encoder};
use fluvio_protocol::api::Request;
use fluvio_protocol::link::ErrorCode;
use fluvio_types::SpuId;

use crate::topic::TopicSpec;

#[derive(Encoder, Decoder, Default, Debug, Clone)]
pub struct MirroringRemoteClusterRequest<S> {
    pub request: S,
}

impl<S> MirroringRemoteClusterSpec for MirroringRemoteClusterRequest<S> where S: Encoder + Decoder {}

#[derive(Encoder, Decoder, Default, Debug, Clone, PartialEq)]
#[cfg_attr(
    feature = "use_serde",
    derive(serde::Serialize, serde::Deserialize),
    serde(rename_all = "camelCase")
)]
pub struct MirrorConnect {
    pub remote_id: String,
}

impl MirroringRemoteClusterSpec for MirrorConnect {}

impl Spec for MirrorConnect {
    const LABEL: &'static str = "MirroringConnect";
    type IndexKey = String;
    type Status = MirroringStatus;
    type Owner = Self;
}

#[derive(Encoder, Decoder, Default, Debug, Clone, PartialEq)]
#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
pub struct MirroringStatus {}

impl std::fmt::Display for MirroringStatus {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        write!(f, "MirroringStatus")
    }
}

impl Status for MirroringStatus {}

impl Request for MirroringRemoteClusterRequest<MirrorConnect> {
    const API_KEY: u16 = MirroringApiKey::Connect as u16;
    type Response = MirroringStatusResponse;
}

/// API call from client to SPU
#[repr(u16)]
#[derive(Encoder, Decoder, Eq, PartialEq, Debug, Clone, Copy)]
#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
#[fluvio(encode_discriminant)]
pub enum MirroringApiKey {
    ApiVersion = 1, // version api key
    Connect = 3000,
}

impl Default for MirroringApiKey {
    fn default() -> Self {
        Self::ApiVersion
    }
}

pub trait MirroringRemoteClusterSpec: Encoder + Decoder {}

#[derive(Encoder, Decoder, Default, Debug, Clone, PartialEq)]
#[cfg_attr(
    feature = "use_serde",
    derive(serde::Serialize, serde::Deserialize),
    serde(rename_all = "camelCase")
)]
pub struct MirroringStatusResponse {
    pub name: String,
    #[cfg_attr(feature = "use_serde", serde(skip))]
    pub error_code: ErrorCode,
    pub error_message: Option<String>,
    pub topics: Vec<MirroringSpecWrapper<TopicSpec>>,
}

impl MirroringStatusResponse {
    pub fn new_ok(name: &str, topics: Vec<MirroringSpecWrapper<TopicSpec>>) -> Self {
        MirroringStatusResponse {
            name: name.to_string(),
            error_code: ErrorCode::None,
            error_message: None,
            topics,
        }
    }
}

#[derive(Encoder, Decoder, Default, Debug, Clone, PartialEq)]
#[cfg_attr(
    feature = "use_serde",
    derive(serde::Serialize, serde::Deserialize),
    serde(rename_all = "camelCase")
)]
pub struct MirroringSpecWrapper<S> {
    pub key: String,
    pub spec: S,
    pub spu_id: SpuId,
    pub spu_endpoint: String,
}

impl<S> MirroringSpecWrapper<S> {
    pub fn new(key: String, spec: S, spu_id: i32, spu_endpoint: String) -> Self {
        Self {
            key,
            spec,
            spu_id,
            spu_endpoint,
        }
    }
}