Skip to main content

slim_datapath/
errors.rs

1// Copyright AGNTCY Contributors (https://github.com/agntcy)
2// SPDX-License-Identifier: Apache-2.0
3
4use std::string;
5
6use crate::api::ProtoName;
7use crate::api::ProtoSessionMessageType;
8use crate::api::proto::dataplane::v1::Message;
9use crate::messages::utils::MessageError;
10#[cfg(not(target_arch = "wasm32"))]
11use slim_config::errors::ConfigError;
12use thiserror::Error;
13
14/// DataPath and subscription table errors merged into a single enum.
15#[derive(Error, Debug)]
16pub enum DataPathError {
17    // Connection lifecycle
18    #[error("connection error")]
19    ConnectionError,
20    #[error("disconnection error")]
21    DisconnectionError(u64),
22    #[cfg(not(target_arch = "wasm32"))]
23    #[error("grpc error")]
24    GrpcError(#[from] tonic::Status),
25    #[error("link negotiation error")]
26    NegotiationError(String),
27
28    // Message classification / validation
29    #[error("unknown message type")]
30    UnknownMsgType,
31    #[error("invalid message: {0}")]
32    InvalidMessage(MessageError),
33    #[error("invalid name id format: {0}")]
34    InvalidNameIdFormat(String),
35    #[error("invalid name format: {0}")]
36    InvalidNameFormat(String),
37
38    // Subscription / matching
39    #[error("no matching found for [{:x}, {:x}, {:x}, {}]", .0, .1, .2, .3)]
40    NoMatchEncoded(u64, u64, u64, String),
41    #[error("subscription not found")]
42    SubscriptionNotFound(ProtoName),
43    #[error("subscription id not found: {0}")]
44    SubscriptionIdNotFound(u64),
45    #[error("id not found: {0}")]
46    IdNotFound(string::String),
47
48    // Connection lookup
49    #[error("connection not found: {0}")]
50    ConnectionNotFound(u64),
51    #[error("connection id not found: {0}")]
52    ConnectionIdNotFound(u64),
53
54    // Processing
55    #[error("malformed message")]
56    MalformedMessage(#[from] MessageError),
57    #[error("message processing error: {0}")]
58    ProcessingError(MessageError),
59    #[error("connection send error")]
60    ConnectionSendError,
61    #[error("error adding connection to connection table")]
62    ConnectionTableAddError,
63    #[error("message processing error: {source}")]
64    MessageProcessingError {
65        #[source]
66        source: Box<DataPathError>,
67        msg: Box<Message>,
68    },
69
70    // Configuration error
71    #[cfg(not(target_arch = "wasm32"))]
72    #[error("configuration error")]
73    ConfigurationError(#[from] ConfigError),
74
75    // Remote subscription ACK errors
76    #[error("remote subscription ack timed out after {0} retries")]
77    RemoteSubscriptionAckTimeout(u32),
78
79    #[error("remote subscription ack error")]
80    RemoteSubscriptionAckError(String),
81
82    // Shutdown errors
83    #[error("data path is already closed")]
84    AlreadyClosedError,
85    #[error("data plane is shutting down")]
86    ShuttingDownError,
87    #[error("timeout during shutdown")]
88    ShutdownTimeoutError,
89
90    #[cfg(not(target_arch = "wasm32"))]
91    #[error("SLIM header integrity: {0}")]
92    HeaderIntegrity(#[from] crate::header_mac::HeaderMacError),
93
94    #[error("header MAC requires completed link negotiation on connection {0}")]
95    HeaderMacAwaitingLinkNegotiation(u64),
96
97    #[error("inter-node ephemeral key generation failed")]
98    LinkKeyGeneration,
99
100    #[error("message TTL expired")]
101    TtlExpired,
102}
103
104#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
105pub struct MessageContext {
106    pub message_id: u32,
107    pub session_id: u32,
108    pub session_message_type: i32,
109}
110
111impl MessageContext {
112    pub fn from_msg(msg: &Message) -> Option<Self> {
113        msg.try_get_session_header().map(|header| Self {
114            message_id: header.get_message_id(),
115            session_id: header.get_session_id(),
116            session_message_type: header.session_message_type().into(),
117        })
118    }
119
120    pub fn get_session_message_type(&self) -> ProtoSessionMessageType {
121        self.session_message_type
122            .try_into()
123            .unwrap_or(ProtoSessionMessageType::Unspecified)
124    }
125}
126
127/// A unified error payload that includes an error message and optional session context.
128/// This type is used to serialize/deserialize errors sent over gRPC with consistent JSON structure.
129#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
130pub struct ErrorPayload {
131    pub error: String,
132    #[serde(skip_serializing_if = "Option::is_none")]
133    pub session_context: Option<MessageContext>,
134}
135
136impl std::fmt::Display for ErrorPayload {
137    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138        write!(f, "ErrorPayload: {}", self.error)?;
139        match &self.session_context {
140            Some(ctx) => write!(
141                f,
142                " (session_id={}, message_id={}, session_message_type={:?})",
143                ctx.session_id,
144                ctx.message_id,
145                ctx.get_session_message_type()
146            ),
147            None => Ok(()),
148        }
149    }
150}
151
152impl ErrorPayload {
153    /// Create a new error payload
154    pub fn new(error: String, session_context: Option<MessageContext>) -> Self {
155        Self {
156            error,
157            session_context,
158        }
159    }
160
161    /// Convert to JSON string for transmission
162    pub fn to_json_string(&self) -> String {
163        serde_json::to_string(self).expect("ErrorPayload should be serializable")
164    }
165
166    /// Parse from JSON string
167    pub fn from_json_str(s: &str) -> Option<Self> {
168        serde_json::from_str(s).ok()
169    }
170}