coerce/remote/stream/
system.rs1use crate::remote::net::proto::network::{
2 LeaderChangedEvent, NewNodeEvent, NodeRemovedEvent, SystemEvent as SysEvent,
3};
4use crate::remote::net::StreamData;
5use crate::remote::stream::pubsub::Topic;
6use std::sync::Arc;
7
8use crate::remote::cluster::node::RemoteNode;
9
10use crate::remote::system::NodeId;
11use protobuf::{Enum, Error, Message};
12
13pub struct SystemTopic;
14
15#[derive(Debug)]
16pub enum ClusterEvent {
17 NodeAdded(Arc<RemoteNode>),
18 NodeRemoved(Arc<RemoteNode>),
19 LeaderChanged(NodeId),
20}
21
22#[derive(Debug)]
23pub enum SystemEvent {
24 Cluster(ClusterEvent),
25}
26
27impl Topic for SystemTopic {
28 type Message = SystemEvent;
29
30 fn topic_name() -> &'static str {
31 "coerce"
32 }
33}
34
35impl From<NewNodeEvent> for SystemEvent {
36 fn from(message: NewNodeEvent) -> Self {
37 let node = message.node.unwrap();
38
39 SystemEvent::Cluster(ClusterEvent::NodeAdded(Arc::new(node.into())))
40 }
41}
42
43impl From<NodeRemovedEvent> for SystemEvent {
44 fn from(message: NodeRemovedEvent) -> Self {
45 let node = message.node.unwrap();
46
47 SystemEvent::Cluster(ClusterEvent::NodeRemoved(Arc::new(node.into())))
48 }
49}
50impl From<LeaderChangedEvent> for SystemEvent {
51 fn from(message: LeaderChangedEvent) -> Self {
52 SystemEvent::Cluster(ClusterEvent::LeaderChanged(message.node_id))
53 }
54}
55
56impl StreamData for SystemEvent {
57 fn read_from_bytes(data: Vec<u8>) -> Option<Self> {
58 match data.split_first() {
59 Some((event, message)) => match SysEvent::from_i32(*event as i32) {
60 Some(SysEvent::ClusterNodeRemoved) => {
61 Some(NodeRemovedEvent::parse_from_bytes(message).unwrap().into())
62 }
63 Some(SysEvent::ClusterNewNode) => {
64 Some(NewNodeEvent::parse_from_bytes(message).unwrap().into())
65 }
66 Some(SysEvent::ClusterLeaderChanged) => Some(
67 LeaderChangedEvent::parse_from_bytes(message)
68 .unwrap()
69 .into(),
70 ),
71 None => None,
72 },
73 None => None,
74 }
75 }
76
77 fn write_to_bytes(&self) -> Option<Vec<u8>> {
78 match self {
79 SystemEvent::Cluster(cluster) => match cluster {
80 ClusterEvent::NodeAdded(node) => {
81 let event = NewNodeEvent {
82 node: Some(node.as_ref().into()).into(),
83 ..NewNodeEvent::default()
84 };
85
86 write_event(SysEvent::ClusterNewNode, event.write_to_bytes())
87 }
88 ClusterEvent::NodeRemoved(node) => {
89 let event = NodeRemovedEvent {
90 node: Some(node.as_ref().into()).into(),
91 ..NodeRemovedEvent::default()
92 };
93
94 write_event(SysEvent::ClusterNodeRemoved, event.write_to_bytes())
95 }
96 ClusterEvent::LeaderChanged(node_id) => {
97 let event = LeaderChangedEvent {
98 node_id: *node_id,
99 ..LeaderChangedEvent::default()
100 };
101
102 write_event(SysEvent::ClusterLeaderChanged, event.write_to_bytes())
103 }
104 },
105 }
106 }
107}
108
109fn write_event(system_event: SysEvent, message: Result<Vec<u8>, Error>) -> Option<Vec<u8>> {
110 let o = match message {
111 Ok(mut message) => {
112 message.insert(0, system_event as u8);
113
114 Some(message)
115 }
116 Err(_) => None,
117 };
118
119 o
120}