agp_datapath/messages/
utils.rs

1// SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates.
2// SPDX-License-Identifier: Apache-2.0
3
4use core::fmt;
5use std::collections::HashMap;
6
7use super::encoder::{Agent, AgentClass};
8use crate::pubsub::{
9    Content, ProtoAgentClass, ProtoAgentGroup, ProtoAgentId, ProtoMessage, ProtoPublish,
10    ProtoPublishType, ProtoSubscribe, ProtoSubscribeType, ProtoUnsubscribe, ProtoUnsubscribeType,
11};
12
13use thiserror::Error;
14use tracing::error;
15
16#[derive(Error, Debug, PartialEq)]
17pub enum MessageError {
18    #[error("name not found")]
19    NameNotFound,
20    #[error("class not found")]
21    ClassNotFound,
22    #[error("group not found")]
23    GroupNotFound,
24}
25
26pub enum CommandType {
27    ReceivedFrom,
28    ForwardTo,
29    IncomingConnection,
30    Unknown,
31}
32
33impl fmt::Display for CommandType {
34    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
35        match self {
36            CommandType::ReceivedFrom => write!(f, "ReceivedFrom"),
37            CommandType::ForwardTo => write!(f, "ForwardTo"),
38            CommandType::IncomingConnection => write!(f, "IncomingConnection"),
39            CommandType::Unknown => write!(f, "Unkwon"),
40        }
41    }
42}
43
44fn create_source(source: &Agent) -> Option<ProtoAgentId> {
45    Some(ProtoAgentId {
46        class: Some(ProtoAgentClass {
47            group: Some(ProtoAgentGroup {
48                organization: source.agent_class.organization,
49                namespace: source.agent_class.namespace,
50            }),
51            class: source.agent_class.agent_class,
52        }),
53        id: Some(source.agent_id),
54    })
55}
56
57fn create_name(name: &AgentClass, id: Option<u64>) -> Option<ProtoAgentId> {
58    Some(ProtoAgentId {
59        class: Some(ProtoAgentClass {
60            group: Some(ProtoAgentGroup {
61                organization: name.organization,
62                namespace: name.namespace,
63            }),
64            class: name.agent_class,
65        }),
66        id,
67    })
68}
69
70pub fn create_subscription(
71    source: &Agent,
72    name: &AgentClass,
73    id: Option<u64>,
74    metadata: HashMap<String, String>,
75) -> ProtoMessage {
76    ProtoMessage {
77        metadata,
78        message_type: Some(ProtoSubscribeType(ProtoSubscribe {
79            source: create_source(source),
80            name: create_name(name, id),
81        })),
82    }
83}
84
85pub fn create_unsubscription(
86    source: &Agent,
87    name: &AgentClass,
88    id: Option<u64>,
89    metadata: HashMap<String, String>,
90) -> ProtoMessage {
91    ProtoMessage {
92        metadata,
93        message_type: Some(ProtoUnsubscribeType(ProtoUnsubscribe {
94            source: create_source(source),
95            name: create_name(name, id),
96        })),
97    }
98}
99
100pub fn create_publication(
101    source: &Agent,
102    name: &AgentClass,
103    id: Option<u64>,
104    metadata: HashMap<String, String>,
105    fanout: u32,
106    content_type: &str,
107    blob: Vec<u8>,
108) -> ProtoMessage {
109    ProtoMessage {
110        metadata,
111        message_type: Some(ProtoPublishType(ProtoPublish {
112            source: create_source(source),
113            name: create_name(name, id),
114            fanout,
115            msg: Some(Content {
116                content_type: content_type.to_string(),
117                blob,
118            }),
119        })),
120    }
121}
122
123pub fn create_subscription_from(
124    name: &AgentClass,
125    id: Option<u64>,
126    from_conn: u64,
127) -> ProtoMessage {
128    // this message is used to set the state inside the local subscription table.
129    // it emulates the reception of a subscription message from a remote end point through
130    // the connection from_conn
131    // this allows to forward pub messages using a standard match on the subscription tables
132    // it works in a similar way to the set_route command in IP: it creates a route to a destion
133    // through a local interface
134
135    // the source field is not used in this case, set it to default
136    let source = Agent::default();
137
138    // add the from_conn in the hashmap of the message
139    let mut metadata = HashMap::new();
140    metadata.insert(CommandType::ReceivedFrom.to_string(), from_conn.to_string());
141
142    // create a subscription with the metadata
143    // the result will be that the subscription will be added to the local
144    // subscription table with connection = from_conn
145    create_subscription(&source, name, id, metadata)
146}
147
148pub fn create_subscription_to_forward(
149    source: &Agent,
150    name: &AgentClass,
151    id: Option<u64>,
152    to_conn: u64,
153) -> ProtoMessage {
154    // this subscription can be received only from a local connection
155    // when this message is received the subscription is set in the local table
156    // and forwarded to the connection to_conn to set the subscription remotely
157    // before forward the subscription the metadata map is cleaned up
158
159    // add the from_conn in the hashmap of the message
160    let mut metadata = HashMap::new();
161    metadata.insert(CommandType::ForwardTo.to_string(), to_conn.to_string());
162
163    create_subscription(source, name, id, metadata)
164}
165
166pub fn create_unsubscription_from(
167    name: &AgentClass,
168    id: Option<u64>,
169    from_conn: u64,
170) -> ProtoMessage {
171    // same as subscription from but it removes the state
172
173    // the source field is not used in this case, set it to default
174    let source = Agent::default();
175
176    // add the from_conn in the hashmap of the message
177    let mut metadata = HashMap::new();
178    metadata.insert(CommandType::ReceivedFrom.to_string(), from_conn.to_string());
179
180    // create the unsubscription with the metadata
181    create_unsubscription(&source, name, id, metadata)
182}
183
184pub fn add_incoming_connection(msg: &mut ProtoMessage, in_connection: u64) {
185    msg.metadata.insert(
186        CommandType::IncomingConnection.to_string(),
187        in_connection.to_string(),
188    );
189}
190
191pub fn get_incoming_connection(msg: &ProtoMessage) -> Option<u64> {
192    match msg
193        .metadata
194        .get(&CommandType::IncomingConnection.to_string())
195    {
196        None => None,
197        Some(conn) => conn.parse::<u64>().ok(),
198    }
199}
200
201pub fn create_unsubscription_to_forward(
202    source: &Agent,
203    name: &AgentClass,
204    id: Option<u64>,
205    to_conn: u64,
206) -> ProtoMessage {
207    // same as the subscription to forward but it remove the state
208
209    // add the from_conn in the hashmap of the message
210    let mut metadata = HashMap::new();
211    metadata.insert(CommandType::ForwardTo.to_string(), to_conn.to_string());
212
213    create_unsubscription(source, name, id, metadata)
214}
215
216pub fn process_name(name: &Option<ProtoAgentId>) -> Result<AgentClass, MessageError> {
217    if name.is_none() {
218        error! {"unable to parse message, name not found"};
219        return Err(MessageError::NameNotFound);
220    }
221
222    let name = name.unwrap();
223    if name.class.is_none() {
224        error! {"unable to parse message, class not found"};
225        return Err(MessageError::ClassNotFound);
226    }
227
228    let class = name.class.unwrap();
229
230    if class.group.is_none() {
231        error! {"unable to parse message, group not found"};
232        return Err(MessageError::GroupNotFound);
233    }
234
235    let group = class.group.unwrap();
236
237    let class = AgentClass {
238        organization: group.organization,
239        namespace: group.namespace,
240        agent_class: class.class,
241    };
242
243    Ok(class)
244}
245
246pub fn get_agent_id(name: &Option<ProtoAgentId>) -> Option<u64> {
247    match name {
248        None => None,
249        Some(n) => n.id,
250    }
251}
252
253pub fn get_fanout(pubmsg: &ProtoPublish) -> u32 {
254    pubmsg.fanout
255}
256
257pub fn get_payload(pubmsg: &ProtoPublish) -> &[u8] {
258    &pubmsg.msg.as_ref().unwrap().blob
259}