1use core::fmt;
5use std::collections::HashMap;
6
7use super::encoder::{Agent, AgentClass};
8use crate::pubsub::{
9 Content, ProtoAgentClass, ProtoAgentGroup, ProtoAgentId, ProtoMessage, ProtoPublish,
10 ProtoPublishType, ProtoSubscribe, ProtoSubscribeType, ProtoUnsubscribe, ProtoUnsubscribeType,
11};
12
13use thiserror::Error;
14use tracing::error;
15
16#[derive(Error, Debug, PartialEq)]
17pub enum MessageError {
18 #[error("name not found")]
19 NameNotFound,
20 #[error("class not found")]
21 ClassNotFound,
22 #[error("group not found")]
23 GroupNotFound,
24}
25
26pub enum CommandType {
27 ReceivedFrom,
28 ForwardTo,
29 IncomingConnection,
30 Unknown,
31}
32
33impl fmt::Display for CommandType {
34 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
35 match self {
36 CommandType::ReceivedFrom => write!(f, "ReceivedFrom"),
37 CommandType::ForwardTo => write!(f, "ForwardTo"),
38 CommandType::IncomingConnection => write!(f, "IncomingConnection"),
39 CommandType::Unknown => write!(f, "Unkwon"),
40 }
41 }
42}
43
44fn create_source(source: &Agent) -> Option<ProtoAgentId> {
45 Some(ProtoAgentId {
46 class: Some(ProtoAgentClass {
47 group: Some(ProtoAgentGroup {
48 organization: source.agent_class.organization,
49 namespace: source.agent_class.namespace,
50 }),
51 class: source.agent_class.agent_class,
52 }),
53 id: Some(source.agent_id),
54 })
55}
56
57fn create_name(name: &AgentClass, id: Option<u64>) -> Option<ProtoAgentId> {
58 Some(ProtoAgentId {
59 class: Some(ProtoAgentClass {
60 group: Some(ProtoAgentGroup {
61 organization: name.organization,
62 namespace: name.namespace,
63 }),
64 class: name.agent_class,
65 }),
66 id,
67 })
68}
69
70pub fn create_subscription(
71 source: &Agent,
72 name: &AgentClass,
73 id: Option<u64>,
74 metadata: HashMap<String, String>,
75) -> ProtoMessage {
76 ProtoMessage {
77 metadata,
78 message_type: Some(ProtoSubscribeType(ProtoSubscribe {
79 source: create_source(source),
80 name: create_name(name, id),
81 })),
82 }
83}
84
85pub fn create_unsubscription(
86 source: &Agent,
87 name: &AgentClass,
88 id: Option<u64>,
89 metadata: HashMap<String, String>,
90) -> ProtoMessage {
91 ProtoMessage {
92 metadata,
93 message_type: Some(ProtoUnsubscribeType(ProtoUnsubscribe {
94 source: create_source(source),
95 name: create_name(name, id),
96 })),
97 }
98}
99
100pub fn create_publication(
101 source: &Agent,
102 name: &AgentClass,
103 id: Option<u64>,
104 metadata: HashMap<String, String>,
105 fanout: u32,
106 content_type: &str,
107 blob: Vec<u8>,
108) -> ProtoMessage {
109 ProtoMessage {
110 metadata,
111 message_type: Some(ProtoPublishType(ProtoPublish {
112 source: create_source(source),
113 name: create_name(name, id),
114 fanout,
115 msg: Some(Content {
116 content_type: content_type.to_string(),
117 blob,
118 }),
119 })),
120 }
121}
122
123pub fn create_subscription_from(
124 name: &AgentClass,
125 id: Option<u64>,
126 from_conn: u64,
127) -> ProtoMessage {
128 let source = Agent::default();
137
138 let mut metadata = HashMap::new();
140 metadata.insert(CommandType::ReceivedFrom.to_string(), from_conn.to_string());
141
142 create_subscription(&source, name, id, metadata)
146}
147
148pub fn create_subscription_to_forward(
149 source: &Agent,
150 name: &AgentClass,
151 id: Option<u64>,
152 to_conn: u64,
153) -> ProtoMessage {
154 let mut metadata = HashMap::new();
161 metadata.insert(CommandType::ForwardTo.to_string(), to_conn.to_string());
162
163 create_subscription(source, name, id, metadata)
164}
165
166pub fn create_unsubscription_from(
167 name: &AgentClass,
168 id: Option<u64>,
169 from_conn: u64,
170) -> ProtoMessage {
171 let source = Agent::default();
175
176 let mut metadata = HashMap::new();
178 metadata.insert(CommandType::ReceivedFrom.to_string(), from_conn.to_string());
179
180 create_unsubscription(&source, name, id, metadata)
182}
183
184pub fn add_incoming_connection(msg: &mut ProtoMessage, in_connection: u64) {
185 msg.metadata.insert(
186 CommandType::IncomingConnection.to_string(),
187 in_connection.to_string(),
188 );
189}
190
191pub fn get_incoming_connection(msg: &ProtoMessage) -> Option<u64> {
192 match msg
193 .metadata
194 .get(&CommandType::IncomingConnection.to_string())
195 {
196 None => None,
197 Some(conn) => conn.parse::<u64>().ok(),
198 }
199}
200
201pub fn create_unsubscription_to_forward(
202 source: &Agent,
203 name: &AgentClass,
204 id: Option<u64>,
205 to_conn: u64,
206) -> ProtoMessage {
207 let mut metadata = HashMap::new();
211 metadata.insert(CommandType::ForwardTo.to_string(), to_conn.to_string());
212
213 create_unsubscription(source, name, id, metadata)
214}
215
216pub fn process_name(name: &Option<ProtoAgentId>) -> Result<AgentClass, MessageError> {
217 if name.is_none() {
218 error! {"unable to parse message, name not found"};
219 return Err(MessageError::NameNotFound);
220 }
221
222 let name = name.unwrap();
223 if name.class.is_none() {
224 error! {"unable to parse message, class not found"};
225 return Err(MessageError::ClassNotFound);
226 }
227
228 let class = name.class.unwrap();
229
230 if class.group.is_none() {
231 error! {"unable to parse message, group not found"};
232 return Err(MessageError::GroupNotFound);
233 }
234
235 let group = class.group.unwrap();
236
237 let class = AgentClass {
238 organization: group.organization,
239 namespace: group.namespace,
240 agent_class: class.class,
241 };
242
243 Ok(class)
244}
245
246pub fn get_agent_id(name: &Option<ProtoAgentId>) -> Option<u64> {
247 match name {
248 None => None,
249 Some(n) => n.id,
250 }
251}
252
253pub fn get_fanout(pubmsg: &ProtoPublish) -> u32 {
254 pubmsg.fanout
255}
256
257pub fn get_payload(pubmsg: &ProtoPublish) -> &[u8] {
258 &pubmsg.msg.as_ref().unwrap().blob
259}