agp_datapath/messages/
utils.rs

1// SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates.
2// SPDX-License-Identifier: Apache-2.0
3
4use core::fmt;
5use std::collections::HashMap;
6
7use super::encoder::{Agent, AgentClass, DEFAULT_AGENT_ID};
8use crate::pubsub::{
9    Content, ProtoAgentClass, ProtoAgentGroup, ProtoAgentId, ProtoMessage, ProtoPublish,
10    ProtoPublishType, ProtoSubscribe, ProtoSubscribeType, ProtoUnsubscribe, ProtoUnsubscribeType,
11};
12
13use thiserror::Error;
14use tracing::error;
15
16#[derive(Error, Debug, PartialEq)]
17pub enum MessageError {
18    #[error("name not found")]
19    NameNotFound,
20    #[error("class not found")]
21    ClassNotFound,
22    #[error("group not found")]
23    GroupNotFound,
24}
25
26pub enum MetadataType {
27    ReceivedFrom,
28    ForwardTo,
29    IncomingConnection,
30    Error,
31    Unknown,
32}
33
34impl fmt::Display for MetadataType {
35    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
36        match self {
37            MetadataType::ReceivedFrom => write!(f, "ReceivedFrom"),
38            MetadataType::ForwardTo => write!(f, "ForwardTo"),
39            MetadataType::IncomingConnection => write!(f, "IncomingConnection"),
40            MetadataType::Error => write!(f, "Error"),
41            MetadataType::Unknown => write!(f, "Unknown"),
42        }
43    }
44}
45
46fn create_source(source: &Agent) -> Option<ProtoAgentId> {
47    Some(ProtoAgentId {
48        class: Some(ProtoAgentClass {
49            group: Some(ProtoAgentGroup {
50                organization: source.agent_class.organization,
51                namespace: source.agent_class.namespace,
52            }),
53            class: source.agent_class.agent_class,
54        }),
55        id: Some(source.agent_id),
56    })
57}
58
59fn create_name(name: &AgentClass, id: Option<u64>) -> Option<ProtoAgentId> {
60    Some(ProtoAgentId {
61        class: Some(ProtoAgentClass {
62            group: Some(ProtoAgentGroup {
63                organization: name.organization,
64                namespace: name.namespace,
65            }),
66            class: name.agent_class,
67        }),
68        id,
69    })
70}
71
72pub fn create_subscription(
73    source: &Agent,
74    name: &AgentClass,
75    id: Option<u64>,
76    metadata: HashMap<String, String>,
77) -> ProtoMessage {
78    ProtoMessage {
79        metadata,
80        message_type: Some(ProtoSubscribeType(ProtoSubscribe {
81            source: create_source(source),
82            name: create_name(name, id),
83        })),
84    }
85}
86
87pub fn create_unsubscription(
88    source: &Agent,
89    name: &AgentClass,
90    id: Option<u64>,
91    metadata: HashMap<String, String>,
92) -> ProtoMessage {
93    ProtoMessage {
94        metadata,
95        message_type: Some(ProtoUnsubscribeType(ProtoUnsubscribe {
96            source: create_source(source),
97            name: create_name(name, id),
98        })),
99    }
100}
101
102pub fn create_publication(
103    source: &Agent,
104    name: &AgentClass,
105    id: Option<u64>,
106    metadata: HashMap<String, String>,
107    fanout: u32,
108    content_type: &str,
109    blob: Vec<u8>,
110) -> ProtoMessage {
111    ProtoMessage {
112        metadata,
113        message_type: Some(ProtoPublishType(ProtoPublish {
114            source: create_source(source),
115            name: create_name(name, id),
116            fanout,
117            msg: Some(Content {
118                content_type: content_type.to_string(),
119                blob,
120            }),
121        })),
122    }
123}
124
125pub fn create_subscription_from(
126    name: &AgentClass,
127    id: Option<u64>,
128    from_conn: u64,
129) -> ProtoMessage {
130    // this message is used to set the state inside the local subscription table.
131    // it emulates the reception of a subscription message from a remote end point through
132    // the connection from_conn
133    // this allows to forward pub messages using a standard match on the subscription tables
134    // it works in a similar way to the set_route command in IP: it creates a route to a destion
135    // through a local interface
136
137    // the source field is not used in this case, set it to default
138    let source = Agent::default();
139
140    // add the from_conn in the hashmap of the message
141    let mut metadata = HashMap::new();
142    metadata.insert(
143        MetadataType::ReceivedFrom.to_string(),
144        from_conn.to_string(),
145    );
146
147    // create a subscription with the metadata
148    // the result will be that the subscription will be added to the local
149    // subscription table with connection = from_conn
150    create_subscription(&source, name, id, metadata)
151}
152
153pub fn create_subscription_to_forward(
154    source: &Agent,
155    name: &AgentClass,
156    id: Option<u64>,
157    to_conn: u64,
158) -> ProtoMessage {
159    // this subscription can be received only from a local connection
160    // when this message is received the subscription is set in the local table
161    // and forwarded to the connection to_conn to set the subscription remotely
162    // before forward the subscription the metadata map is cleaned up
163
164    // add the from_conn in the hashmap of the message
165    let mut metadata = HashMap::new();
166    metadata.insert(MetadataType::ForwardTo.to_string(), to_conn.to_string());
167
168    create_subscription(source, name, id, metadata)
169}
170
171pub fn create_unsubscription_from(
172    name: &AgentClass,
173    id: Option<u64>,
174    from_conn: u64,
175) -> ProtoMessage {
176    // same as subscription from but it removes the state
177
178    // the source field is not used in this case, set it to default
179    let source = Agent::default();
180
181    // add the from_conn in the hashmap of the message
182    let mut metadata = HashMap::new();
183    metadata.insert(
184        MetadataType::ReceivedFrom.to_string(),
185        from_conn.to_string(),
186    );
187
188    // create the unsubscription with the metadata
189    create_unsubscription(&source, name, id, metadata)
190}
191
192pub fn add_incoming_connection(msg: &mut ProtoMessage, in_connection: u64) {
193    msg.metadata.insert(
194        MetadataType::IncomingConnection.to_string(),
195        in_connection.to_string(),
196    );
197}
198
199pub fn get_incoming_connection(msg: &ProtoMessage) -> Option<u64> {
200    match msg
201        .metadata
202        .get(&MetadataType::IncomingConnection.to_string())
203    {
204        None => None,
205        Some(conn) => conn.parse::<u64>().ok(),
206    }
207}
208
209pub fn get_source(msg: &ProtoMessage) -> Option<Agent> {
210    let source = match &msg.message_type {
211        Some(msg_type) => match msg_type {
212            ProtoPublishType(publish) => publish.source,
213            ProtoSubscribeType(sub) => sub.source,
214            ProtoUnsubscribeType(unsub) => unsub.source,
215        },
216        None => None,
217    };
218
219    source?;
220
221    let (class, id) = match process_name(&source) {
222        Ok(class) => (Some(class), get_agent_id(&source)),
223        Err(_) => (None, None),
224    };
225
226    let unwrap_class = class?;
227
228    let src_name = Agent {
229        agent_class: unwrap_class,
230        agent_id: id.unwrap_or(DEFAULT_AGENT_ID),
231    };
232
233    Some(src_name)
234}
235
236pub fn get_name(msg: &ProtoMessage) -> Option<Agent> {
237    let name = match &msg.message_type {
238        Some(msg_type) => match msg_type {
239            ProtoPublishType(publish) => publish.name,
240            ProtoSubscribeType(sub) => sub.name,
241            ProtoUnsubscribeType(unsub) => unsub.name,
242        },
243        None => None,
244    };
245
246    name?;
247
248    let (class, id) = match process_name(&name) {
249        Ok(class) => (Some(class), get_agent_id(&name)),
250        Err(_) => (None, None),
251    };
252
253    let unwrap_class = class?;
254
255    let dst_name = Agent {
256        agent_class: unwrap_class,
257        agent_id: id.unwrap_or(DEFAULT_AGENT_ID),
258    };
259
260    Some(dst_name)
261}
262
263pub fn create_unsubscription_to_forward(
264    source: &Agent,
265    name: &AgentClass,
266    id: Option<u64>,
267    to_conn: u64,
268) -> ProtoMessage {
269    // same as the subscription to forward but it remove the state
270
271    // add the from_conn in the hashmap of the message
272    let mut metadata = HashMap::new();
273    metadata.insert(MetadataType::ForwardTo.to_string(), to_conn.to_string());
274
275    create_unsubscription(source, name, id, metadata)
276}
277
278pub fn process_name(name: &Option<ProtoAgentId>) -> Result<AgentClass, MessageError> {
279    if name.is_none() {
280        error! {"unable to parse message, name not found"};
281        return Err(MessageError::NameNotFound);
282    }
283
284    let name = name.unwrap();
285    if name.class.is_none() {
286        error! {"unable to parse message, class not found"};
287        return Err(MessageError::ClassNotFound);
288    }
289
290    let class = name.class.unwrap();
291
292    if class.group.is_none() {
293        error! {"unable to parse message, group not found"};
294        return Err(MessageError::GroupNotFound);
295    }
296
297    let group = class.group.unwrap();
298
299    let class = AgentClass {
300        organization: group.organization,
301        namespace: group.namespace,
302        agent_class: class.class,
303    };
304
305    Ok(class)
306}
307
308pub fn get_agent_id(name: &Option<ProtoAgentId>) -> Option<u64> {
309    match name {
310        None => None,
311        Some(n) => n.id,
312    }
313}
314
315pub fn get_fanout(pubmsg: &ProtoPublish) -> u32 {
316    pubmsg.fanout
317}
318
319pub fn get_payload(pubmsg: &ProtoPublish) -> &[u8] {
320    &pubmsg.msg.as_ref().unwrap().blob
321}