coerce/remote/stream/
system.rs

1use crate::remote::net::proto::network::{
2    LeaderChangedEvent, NewNodeEvent, NodeRemovedEvent, SystemEvent as SysEvent,
3};
4use crate::remote::net::StreamData;
5use crate::remote::stream::pubsub::Topic;
6use std::sync::Arc;
7
8use crate::remote::cluster::node::RemoteNode;
9
10use crate::remote::system::NodeId;
11use protobuf::{Enum, Error, Message};
12
13pub struct SystemTopic;
14
15#[derive(Debug)]
16pub enum ClusterEvent {
17    NodeAdded(Arc<RemoteNode>),
18    NodeRemoved(Arc<RemoteNode>),
19    LeaderChanged(NodeId),
20}
21
22#[derive(Debug)]
23pub enum SystemEvent {
24    Cluster(ClusterEvent),
25}
26
27impl Topic for SystemTopic {
28    type Message = SystemEvent;
29
30    fn topic_name() -> &'static str {
31        "coerce"
32    }
33}
34
35impl From<NewNodeEvent> for SystemEvent {
36    fn from(message: NewNodeEvent) -> Self {
37        let node = message.node.unwrap();
38
39        SystemEvent::Cluster(ClusterEvent::NodeAdded(Arc::new(node.into())))
40    }
41}
42
43impl From<NodeRemovedEvent> for SystemEvent {
44    fn from(message: NodeRemovedEvent) -> Self {
45        let node = message.node.unwrap();
46
47        SystemEvent::Cluster(ClusterEvent::NodeRemoved(Arc::new(node.into())))
48    }
49}
50impl From<LeaderChangedEvent> for SystemEvent {
51    fn from(message: LeaderChangedEvent) -> Self {
52        SystemEvent::Cluster(ClusterEvent::LeaderChanged(message.node_id))
53    }
54}
55
56impl StreamData for SystemEvent {
57    fn read_from_bytes(data: Vec<u8>) -> Option<Self> {
58        match data.split_first() {
59            Some((event, message)) => match SysEvent::from_i32(*event as i32) {
60                Some(SysEvent::ClusterNodeRemoved) => {
61                    Some(NodeRemovedEvent::parse_from_bytes(message).unwrap().into())
62                }
63                Some(SysEvent::ClusterNewNode) => {
64                    Some(NewNodeEvent::parse_from_bytes(message).unwrap().into())
65                }
66                Some(SysEvent::ClusterLeaderChanged) => Some(
67                    LeaderChangedEvent::parse_from_bytes(message)
68                        .unwrap()
69                        .into(),
70                ),
71                None => None,
72            },
73            None => None,
74        }
75    }
76
77    fn write_to_bytes(&self) -> Option<Vec<u8>> {
78        match self {
79            SystemEvent::Cluster(cluster) => match cluster {
80                ClusterEvent::NodeAdded(node) => {
81                    let event = NewNodeEvent {
82                        node: Some(node.as_ref().into()).into(),
83                        ..NewNodeEvent::default()
84                    };
85
86                    write_event(SysEvent::ClusterNewNode, event.write_to_bytes())
87                }
88                ClusterEvent::NodeRemoved(node) => {
89                    let event = NodeRemovedEvent {
90                        node: Some(node.as_ref().into()).into(),
91                        ..NodeRemovedEvent::default()
92                    };
93
94                    write_event(SysEvent::ClusterNodeRemoved, event.write_to_bytes())
95                }
96                ClusterEvent::LeaderChanged(node_id) => {
97                    let event = LeaderChangedEvent {
98                        node_id: *node_id,
99                        ..LeaderChangedEvent::default()
100                    };
101
102                    write_event(SysEvent::ClusterLeaderChanged, event.write_to_bytes())
103                }
104            },
105        }
106    }
107}
108
109fn write_event(system_event: SysEvent, message: Result<Vec<u8>, Error>) -> Option<Vec<u8>> {
110    let o = match message {
111        Ok(mut message) => {
112            message.insert(0, system_event as u8);
113
114            Some(message)
115        }
116        Err(_) => None,
117    };
118
119    o
120}