1use core::fmt;
5use std::collections::HashMap;
6
7use super::encoder::{Agent, AgentClass, DEFAULT_AGENT_ID};
8use crate::pubsub::{
9 Content, ProtoAgentClass, ProtoAgentGroup, ProtoAgentId, ProtoMessage, ProtoPublish,
10 ProtoPublishType, ProtoSubscribe, ProtoSubscribeType, ProtoUnsubscribe, ProtoUnsubscribeType,
11};
12
13use thiserror::Error;
14use tracing::error;
15
16#[derive(Error, Debug, PartialEq)]
17pub enum MessageError {
18 #[error("name not found")]
19 NameNotFound,
20 #[error("class not found")]
21 ClassNotFound,
22 #[error("group not found")]
23 GroupNotFound,
24}
25
26pub enum MetadataType {
27 ReceivedFrom,
28 ForwardTo,
29 IncomingConnection,
30 Error,
31 Unknown,
32}
33
34impl fmt::Display for MetadataType {
35 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
36 match self {
37 MetadataType::ReceivedFrom => write!(f, "ReceivedFrom"),
38 MetadataType::ForwardTo => write!(f, "ForwardTo"),
39 MetadataType::IncomingConnection => write!(f, "IncomingConnection"),
40 MetadataType::Error => write!(f, "Error"),
41 MetadataType::Unknown => write!(f, "Unknown"),
42 }
43 }
44}
45
46fn create_source(source: &Agent) -> Option<ProtoAgentId> {
47 Some(ProtoAgentId {
48 class: Some(ProtoAgentClass {
49 group: Some(ProtoAgentGroup {
50 organization: source.agent_class.organization,
51 namespace: source.agent_class.namespace,
52 }),
53 class: source.agent_class.agent_class,
54 }),
55 id: Some(source.agent_id),
56 })
57}
58
59fn create_name(name: &AgentClass, id: Option<u64>) -> Option<ProtoAgentId> {
60 Some(ProtoAgentId {
61 class: Some(ProtoAgentClass {
62 group: Some(ProtoAgentGroup {
63 organization: name.organization,
64 namespace: name.namespace,
65 }),
66 class: name.agent_class,
67 }),
68 id,
69 })
70}
71
72pub fn create_subscription(
73 source: &Agent,
74 name: &AgentClass,
75 id: Option<u64>,
76 metadata: HashMap<String, String>,
77) -> ProtoMessage {
78 ProtoMessage {
79 metadata,
80 message_type: Some(ProtoSubscribeType(ProtoSubscribe {
81 source: create_source(source),
82 name: create_name(name, id),
83 })),
84 }
85}
86
87pub fn create_unsubscription(
88 source: &Agent,
89 name: &AgentClass,
90 id: Option<u64>,
91 metadata: HashMap<String, String>,
92) -> ProtoMessage {
93 ProtoMessage {
94 metadata,
95 message_type: Some(ProtoUnsubscribeType(ProtoUnsubscribe {
96 source: create_source(source),
97 name: create_name(name, id),
98 })),
99 }
100}
101
102pub fn create_publication(
103 source: &Agent,
104 name: &AgentClass,
105 id: Option<u64>,
106 metadata: HashMap<String, String>,
107 fanout: u32,
108 content_type: &str,
109 blob: Vec<u8>,
110) -> ProtoMessage {
111 ProtoMessage {
112 metadata,
113 message_type: Some(ProtoPublishType(ProtoPublish {
114 source: create_source(source),
115 name: create_name(name, id),
116 fanout,
117 msg: Some(Content {
118 content_type: content_type.to_string(),
119 blob,
120 }),
121 })),
122 }
123}
124
125pub fn create_subscription_from(
126 name: &AgentClass,
127 id: Option<u64>,
128 from_conn: u64,
129) -> ProtoMessage {
130 let source = Agent::default();
139
140 let mut metadata = HashMap::new();
142 metadata.insert(
143 MetadataType::ReceivedFrom.to_string(),
144 from_conn.to_string(),
145 );
146
147 create_subscription(&source, name, id, metadata)
151}
152
153pub fn create_subscription_to_forward(
154 source: &Agent,
155 name: &AgentClass,
156 id: Option<u64>,
157 to_conn: u64,
158) -> ProtoMessage {
159 let mut metadata = HashMap::new();
166 metadata.insert(MetadataType::ForwardTo.to_string(), to_conn.to_string());
167
168 create_subscription(source, name, id, metadata)
169}
170
171pub fn create_unsubscription_from(
172 name: &AgentClass,
173 id: Option<u64>,
174 from_conn: u64,
175) -> ProtoMessage {
176 let source = Agent::default();
180
181 let mut metadata = HashMap::new();
183 metadata.insert(
184 MetadataType::ReceivedFrom.to_string(),
185 from_conn.to_string(),
186 );
187
188 create_unsubscription(&source, name, id, metadata)
190}
191
192pub fn add_incoming_connection(msg: &mut ProtoMessage, in_connection: u64) {
193 msg.metadata.insert(
194 MetadataType::IncomingConnection.to_string(),
195 in_connection.to_string(),
196 );
197}
198
199pub fn get_incoming_connection(msg: &ProtoMessage) -> Option<u64> {
200 match msg
201 .metadata
202 .get(&MetadataType::IncomingConnection.to_string())
203 {
204 None => None,
205 Some(conn) => conn.parse::<u64>().ok(),
206 }
207}
208
209pub fn get_source(msg: &ProtoMessage) -> Option<Agent> {
210 let source = match &msg.message_type {
211 Some(msg_type) => match msg_type {
212 ProtoPublishType(publish) => publish.source,
213 ProtoSubscribeType(sub) => sub.source,
214 ProtoUnsubscribeType(unsub) => unsub.source,
215 },
216 None => None,
217 };
218
219 source?;
220
221 let (class, id) = match process_name(&source) {
222 Ok(class) => (Some(class), get_agent_id(&source)),
223 Err(_) => (None, None),
224 };
225
226 let unwrap_class = class?;
227
228 let src_name = Agent {
229 agent_class: unwrap_class,
230 agent_id: id.unwrap_or(DEFAULT_AGENT_ID),
231 };
232
233 Some(src_name)
234}
235
236pub fn get_name(msg: &ProtoMessage) -> Option<Agent> {
237 let name = match &msg.message_type {
238 Some(msg_type) => match msg_type {
239 ProtoPublishType(publish) => publish.name,
240 ProtoSubscribeType(sub) => sub.name,
241 ProtoUnsubscribeType(unsub) => unsub.name,
242 },
243 None => None,
244 };
245
246 name?;
247
248 let (class, id) = match process_name(&name) {
249 Ok(class) => (Some(class), get_agent_id(&name)),
250 Err(_) => (None, None),
251 };
252
253 let unwrap_class = class?;
254
255 let dst_name = Agent {
256 agent_class: unwrap_class,
257 agent_id: id.unwrap_or(DEFAULT_AGENT_ID),
258 };
259
260 Some(dst_name)
261}
262
263pub fn create_unsubscription_to_forward(
264 source: &Agent,
265 name: &AgentClass,
266 id: Option<u64>,
267 to_conn: u64,
268) -> ProtoMessage {
269 let mut metadata = HashMap::new();
273 metadata.insert(MetadataType::ForwardTo.to_string(), to_conn.to_string());
274
275 create_unsubscription(source, name, id, metadata)
276}
277
278pub fn process_name(name: &Option<ProtoAgentId>) -> Result<AgentClass, MessageError> {
279 if name.is_none() {
280 error! {"unable to parse message, name not found"};
281 return Err(MessageError::NameNotFound);
282 }
283
284 let name = name.unwrap();
285 if name.class.is_none() {
286 error! {"unable to parse message, class not found"};
287 return Err(MessageError::ClassNotFound);
288 }
289
290 let class = name.class.unwrap();
291
292 if class.group.is_none() {
293 error! {"unable to parse message, group not found"};
294 return Err(MessageError::GroupNotFound);
295 }
296
297 let group = class.group.unwrap();
298
299 let class = AgentClass {
300 organization: group.organization,
301 namespace: group.namespace,
302 agent_class: class.class,
303 };
304
305 Ok(class)
306}
307
308pub fn get_agent_id(name: &Option<ProtoAgentId>) -> Option<u64> {
309 match name {
310 None => None,
311 Some(n) => n.id,
312 }
313}
314
315pub fn get_fanout(pubmsg: &ProtoPublish) -> u32 {
316 pubmsg.fanout
317}
318
319pub fn get_payload(pubmsg: &ProtoPublish) -> &[u8] {
320 &pubmsg.msg.as_ref().unwrap().blob
321}