Skip to main content

slim_datapath/
errors.rs

1// Copyright AGNTCY Contributors (https://github.com/agntcy)
2// SPDX-License-Identifier: Apache-2.0
3
4use std::string;
5
6use crate::api::ProtoName;
7use crate::api::ProtoSessionMessageType;
8use crate::api::proto::dataplane::v1::Message;
9use crate::messages::utils::MessageError;
10use slim_config::errors::ConfigError;
11use thiserror::Error;
12
13/// DataPath and subscription table errors merged into a single enum.
14#[derive(Error, Debug)]
15pub enum DataPathError {
16    // Connection lifecycle
17    #[error("connection error")]
18    ConnectionError,
19    #[error("disconnection error")]
20    DisconnectionError(u64),
21    #[error("grpc error")]
22    GrpcError(#[from] tonic::Status),
23    #[error("link negotiation error")]
24    NegotiationError(String),
25
26    // Message classification / validation
27    #[error("unknown message type")]
28    UnknownMsgType,
29    #[error("invalid message: {0}")]
30    InvalidMessage(MessageError),
31    #[error("invalid name id format: {0}")]
32    InvalidNameIdFormat(String),
33    #[error("invalid name format: {0}")]
34    InvalidNameFormat(String),
35
36    // Subscription / matching
37    #[error("no matching found for [{:x}, {:x}, {:x}, {}]", .0, .1, .2, .3)]
38    NoMatchEncoded(u64, u64, u64, String),
39    #[error("subscription not found")]
40    SubscriptionNotFound(ProtoName),
41    #[error("subscription id not found: {0}")]
42    SubscriptionIdNotFound(u64),
43    #[error("id not found: {0}")]
44    IdNotFound(string::String),
45
46    // Connection lookup
47    #[error("connection not found: {0}")]
48    ConnectionNotFound(u64),
49    #[error("connection id not found: {0}")]
50    ConnectionIdNotFound(u64),
51
52    // Processing
53    #[error("malformed message")]
54    MalformedMessage(#[from] MessageError),
55    #[error("message processing error: {0}")]
56    ProcessingError(MessageError),
57    #[error("error adding connection to connection table")]
58    ConnectionTableAddError,
59    #[error("message processing error: {source}")]
60    MessageProcessingError {
61        #[source]
62        source: Box<DataPathError>,
63        msg: Box<Message>,
64    },
65
66    // Configuration error
67    #[error("configuration error")]
68    ConfigurationError(#[from] ConfigError),
69
70    // Remote subscription ACK errors
71    #[error("remote subscription ack timed out after {0} retries")]
72    RemoteSubscriptionAckTimeout(u32),
73
74    #[error("remote subscription ack returned error: {0}")]
75    RemoteSubscriptionAckError(String),
76
77    // Shutdown errors
78    #[error("data path is already closed")]
79    AlreadyClosedError,
80    #[error("data plane is shutting down")]
81    ShuttingDownError,
82    #[error("timeout during shutdown")]
83    ShutdownTimeoutError,
84
85    #[error("SLIM header integrity: {0}")]
86    HeaderIntegrity(#[from] crate::header_mac::HeaderMacError),
87
88    #[error("header MAC requires completed link negotiation on connection {0}")]
89    HeaderMacAwaitingLinkNegotiation(u64),
90
91    #[error("inter-node ephemeral key generation failed")]
92    LinkKeyGeneration,
93}
94
95#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
96pub struct MessageContext {
97    pub message_id: u32,
98    pub session_id: u32,
99    pub session_message_type: i32,
100}
101
102impl MessageContext {
103    pub fn from_msg(msg: &Message) -> Option<Self> {
104        msg.try_get_session_header().map(|header| Self {
105            message_id: header.get_message_id(),
106            session_id: header.get_session_id(),
107            session_message_type: header.session_message_type().into(),
108        })
109    }
110
111    pub fn get_session_message_type(&self) -> ProtoSessionMessageType {
112        self.session_message_type
113            .try_into()
114            .unwrap_or(ProtoSessionMessageType::Unspecified)
115    }
116}
117
118/// A unified error payload that includes an error message and optional session context.
119/// This type is used to serialize/deserialize errors sent over gRPC with consistent JSON structure.
120#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
121pub struct ErrorPayload {
122    pub error: String,
123    #[serde(skip_serializing_if = "Option::is_none")]
124    pub session_context: Option<MessageContext>,
125}
126
127impl std::fmt::Display for ErrorPayload {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        write!(f, "ErrorPayload: {}", self.error)?;
130        match &self.session_context {
131            Some(ctx) => write!(
132                f,
133                " (session_id={}, message_id={}, session_message_type={:?})",
134                ctx.session_id,
135                ctx.message_id,
136                ctx.get_session_message_type()
137            ),
138            None => Ok(()),
139        }
140    }
141}
142
143impl ErrorPayload {
144    /// Create a new error payload
145    pub fn new(error: String, session_context: Option<MessageContext>) -> Self {
146        Self {
147            error,
148            session_context,
149        }
150    }
151
152    /// Convert to JSON string for transmission
153    pub fn to_json_string(&self) -> String {
154        serde_json::to_string(self).expect("ErrorPayload should be serializable")
155    }
156
157    /// Parse from JSON string
158    pub fn from_json_str(s: &str) -> Option<Self> {
159        serde_json::from_str(s).ok()
160    }
161}