1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#![allow(clippy::assign_op_pattern)]

//!
//! # Message Type
//!
//! Message Type is used in Action-Centric messages to label the operation request.
//!
use std::fmt::Debug;
use std::fmt::Display;
use std::fmt;

use fluvio_protocol::{Encoder, Decoder};
use fluvio_stream_model::core::MetadataItem;
use fluvio_stream_model::core::Spec;
use fluvio_stream_model::store::MetadataStoreObject;
use fluvio_stream_model::store::actions::LSChange;

#[derive(Decoder, Default, Encoder, Debug, Eq, PartialEq, Clone)]
pub enum MsgType {
    #[default]
    #[fluvio(tag = 0)]
    UPDATE,
    #[fluvio(tag = 1)]
    DELETE,
}

#[derive(Decoder, Encoder, Debug, Eq, PartialEq, Clone, Default)]
pub struct Message<C> {
    pub header: MsgType,
    pub content: C,
}

impl<C> fmt::Display for Message<C>
where
    C: Display,
{
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "{:#?} {}", self.header, self.content)
    }
}

impl<C> Message<C> {
    pub fn new(typ: MsgType, content: C) -> Self {
        Message {
            header: typ,
            content,
        }
    }

    pub fn delete(content: C) -> Self {
        Self::new(MsgType::DELETE, content)
    }

    pub fn update(content: C) -> Self {
        Self::new(MsgType::UPDATE, content)
    }
}

impl<S, C, D> From<LSChange<S, C>> for Message<D>
where
    S: Spec,
    S::Status: PartialEq,
    C: MetadataItem,
    D: From<MetadataStoreObject<S, C>>,
{
    fn from(change: LSChange<S, C>) -> Self {
        match change {
            LSChange::Add(new) => Message::new(MsgType::UPDATE, new.into()),
            LSChange::Mod(new, _old) => Message::new(MsgType::DELETE, new.into()),
            LSChange::Delete(old) => Message::new(MsgType::DELETE, old.into()),
        }
    }
}