fluvio_sc_schema/objects/
watch.rs

1#![allow(clippy::assign_op_pattern)]
2
3use std::fmt::Debug;
4use std::marker::PhantomData;
5
6use anyhow::Result;
7
8use fluvio_controlplane_metadata::message::Message;
9use fluvio_protocol::{Encoder, Decoder, Version};
10use fluvio_protocol::api::Request;
11use fluvio_controlplane_metadata::store::Epoch;
12
13use crate::{AdminPublicApiKey, AdminSpec, TryEncodableFrom};
14use crate::core::Spec;
15
16use super::classic::{ClassicObjectApiEnum, ClassicDecoding};
17use super::{Metadata, COMMON_VERSION, TypeBuffer};
18
19/// Watch resources
20/// Argument epoch is not being used, it is always 0
21#[derive(Debug, Encoder, Default, Decoder)]
22pub struct WatchRequest<S: AdminSpec> {
23    epoch: Epoch,
24    #[fluvio(min_version = 10)]
25    pub summary: bool, // if true, only return summary
26    data: PhantomData<S>,
27}
28
29impl<S> WatchRequest<S>
30where
31    S: AdminSpec,
32{
33    pub fn summary() -> Self {
34        Self {
35            summary: true,
36            ..Default::default()
37        }
38    }
39}
40
41#[derive(Debug, Default, Encoder)]
42pub struct ObjectApiWatchRequest(TypeBuffer);
43
44impl<S> TryEncodableFrom<WatchRequest<S>> for ObjectApiWatchRequest
45where
46    S: AdminSpec,
47{
48    fn try_encode_from(input: WatchRequest<S>, version: Version) -> Result<Self> {
49        Ok(Self(TypeBuffer::encode::<S, _>(input, version)?))
50    }
51
52    fn downcast(&self) -> Result<Option<WatchRequest<S>>> {
53        self.0.downcast::<S, _>()
54    }
55}
56
57impl Request for ObjectApiWatchRequest {
58    const API_KEY: u16 = AdminPublicApiKey::Watch as u16;
59    const MIN_API_VERSION: i16 = 15;
60    const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
61    type Response = ObjectApiWatchResponse;
62}
63
64#[derive(Debug, Default, Encoder)]
65pub struct ObjectApiWatchResponse(TypeBuffer);
66
67impl<S> TryEncodableFrom<WatchResponse<S>> for ObjectApiWatchResponse
68where
69    S: AdminSpec,
70    S::Status: Encoder + Decoder,
71{
72    fn try_encode_from(input: WatchResponse<S>, version: Version) -> Result<Self> {
73        Ok(Self(TypeBuffer::encode::<S, _>(input, version)?))
74    }
75
76    fn downcast(&self) -> Result<Option<WatchResponse<S>>> {
77        self.0.downcast::<S, _>()
78    }
79}
80
81#[derive(Debug, Default, Encoder, Decoder, Clone)]
82pub struct WatchResponse<S: AdminSpec>
83where
84    S::Status: Encoder + Decoder,
85{
86    inner: MetadataUpdate<S>,
87}
88
89impl<S> WatchResponse<S>
90where
91    S: AdminSpec,
92    S::Status: Encoder + Decoder,
93{
94    pub fn new(inner: MetadataUpdate<S>) -> Self {
95        Self { inner }
96    }
97
98    pub fn inner(self) -> MetadataUpdate<S> {
99        self.inner
100    }
101}
102
103/// updates on metadata
104#[derive(Encoder, Decoder, Default, Clone, Debug)]
105pub struct MetadataUpdate<S>
106where
107    S: Spec + Encoder + Decoder,
108    S::Status: Encoder + Decoder + Debug,
109{
110    pub epoch: Epoch,
111    pub changes: Vec<Message<Metadata<S>>>,
112    pub all: Vec<Metadata<S>>,
113}
114
115impl<S> MetadataUpdate<S>
116where
117    S: Spec + Encoder + Decoder,
118    S::Status: Encoder + Decoder + Debug,
119{
120    pub fn with_changes(epoch: i64, changes: Vec<Message<Metadata<S>>>) -> Self {
121        Self {
122            epoch,
123            changes,
124            all: vec![],
125        }
126    }
127
128    pub fn with_all(epoch: i64, all: Vec<Metadata<S>>) -> Self {
129        Self {
130            epoch,
131            changes: vec![],
132            all,
133        }
134    }
135}
136
137// for supporting classic, this should go away after we remove classic
138ClassicObjectApiEnum!(WatchRequest);
139ClassicObjectApiEnum!(WatchResponse);
140ClassicDecoding!(WatchRequest);
141ClassicDecoding!(WatchResponse);