fluvio_controlplane_metadata/mirroring/
mod.rs

1use std::fmt::Debug;
2
3use fluvio_stream_model::core::{Spec, Status};
4
5use fluvio_protocol::{Decoder, Encoder};
6use fluvio_protocol::api::Request;
7use fluvio_protocol::link::ErrorCode;
8use fluvio_types::SpuId;
9
10use crate::topic::TopicSpec;
11
12#[derive(Encoder, Decoder, Default, Debug, Clone)]
13pub struct MirroringRemoteClusterRequest<S> {
14    pub request: S,
15}
16
17impl<S> MirroringRemoteClusterSpec for MirroringRemoteClusterRequest<S> where S: Encoder + Decoder {}
18
19#[derive(Encoder, Decoder, Default, Debug, Clone, PartialEq)]
20#[cfg_attr(
21    feature = "use_serde",
22    derive(serde::Serialize, serde::Deserialize),
23    serde(rename_all = "camelCase")
24)]
25pub struct MirrorConnect {
26    pub remote_id: String,
27}
28
29impl MirroringRemoteClusterSpec for MirrorConnect {}
30
31impl Spec for MirrorConnect {
32    const LABEL: &'static str = "MirroringConnect";
33    type IndexKey = String;
34    type Status = MirroringStatus;
35    type Owner = Self;
36}
37
38#[derive(Encoder, Decoder, Default, Debug, Clone, PartialEq)]
39#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
40pub struct MirroringStatus {}
41
42impl std::fmt::Display for MirroringStatus {
43    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
44        write!(f, "MirroringStatus")
45    }
46}
47
48impl Status for MirroringStatus {}
49
50impl Request for MirroringRemoteClusterRequest<MirrorConnect> {
51    const API_KEY: u16 = MirroringApiKey::Connect as u16;
52    type Response = MirroringStatusResponse;
53}
54
55/// API call from client to SPU
56#[repr(u16)]
57#[derive(Encoder, Decoder, Eq, PartialEq, Debug, Clone, Copy)]
58#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
59#[fluvio(encode_discriminant)]
60pub enum MirroringApiKey {
61    ApiVersion = 1, // version api key
62    Connect = 3000,
63}
64
65impl Default for MirroringApiKey {
66    fn default() -> Self {
67        Self::ApiVersion
68    }
69}
70
71pub trait MirroringRemoteClusterSpec: Encoder + Decoder {}
72
73#[derive(Encoder, Decoder, Default, Debug, Clone, PartialEq)]
74#[cfg_attr(
75    feature = "use_serde",
76    derive(serde::Serialize, serde::Deserialize),
77    serde(rename_all = "camelCase")
78)]
79pub struct MirroringStatusResponse {
80    pub name: String,
81    #[cfg_attr(feature = "use_serde", serde(skip))]
82    pub error_code: ErrorCode,
83    pub error_message: Option<String>,
84    pub topics: Vec<MirroringSpecWrapper<TopicSpec>>,
85}
86
87impl MirroringStatusResponse {
88    pub fn new_ok(name: &str, topics: Vec<MirroringSpecWrapper<TopicSpec>>) -> Self {
89        MirroringStatusResponse {
90            name: name.to_string(),
91            error_code: ErrorCode::None,
92            error_message: None,
93            topics,
94        }
95    }
96}
97
98#[derive(Encoder, Decoder, Default, Debug, Clone, PartialEq)]
99#[cfg_attr(
100    feature = "use_serde",
101    derive(serde::Serialize, serde::Deserialize),
102    serde(rename_all = "camelCase")
103)]
104pub struct MirroringSpecWrapper<S> {
105    pub key: String,
106    pub spec: S,
107    pub spu_id: SpuId,
108    pub spu_key: String,
109    pub spu_endpoint: String,
110}
111
112impl<S> MirroringSpecWrapper<S> {
113    pub fn new(key: String, spec: S, spu_id: i32, spu_endpoint: String, spu_key: String) -> Self {
114        Self {
115            key,
116            spec,
117            spu_id,
118            spu_endpoint,
119            spu_key,
120        }
121    }
122}