Skip to main content

slim_datapath/
errors.rs

1// Copyright AGNTCY Contributors (https://github.com/agntcy)
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::api::ProtoSessionMessageType;
5use crate::api::proto::dataplane::v1::Message;
6use crate::messages::{Name, utils::MessageError};
7use slim_config::grpc::errors::ConfigError;
8use thiserror::Error;
9
10/// DataPath and subscription table errors merged into a single enum.
11#[derive(Error, Debug)]
12pub enum DataPathError {
13    // Connection lifecycle
14    #[error("connection error")]
15    ConnectionError,
16    #[error("disconnection error")]
17    DisconnectionError(u64),
18    #[error("grpc error")]
19    GrpcError(#[from] tonic::Status),
20
21    // Message classification / validation
22    #[error("unknown message type")]
23    UnknownMsgType,
24    #[error("invalid message: {0}")]
25    InvalidMessage(MessageError),
26
27    // Subscription / matching
28    #[error("no matching found for {0}")]
29    NoMatch(Name),
30    #[error("subscription not found")]
31    SubscriptionNotFound(Name),
32    #[error("subscription id not found: {0}")]
33    SubscriptionIdNotFound(u64),
34    #[error("id not found: {0}")]
35    IdNotFound(u64),
36
37    // Connection lookup
38    #[error("connection not found: {0}")]
39    ConnectionNotFound(u64),
40    #[error("connection id not found: {0}")]
41    ConnectionIdNotFound(u64),
42
43    // Processing
44    #[error("malformed message")]
45    MalformedMessage(#[from] MessageError),
46    #[error("message processing error: {0}")]
47    ProcessingError(MessageError),
48    #[error("error adding connection to connection table")]
49    ConnectionTableAddError,
50    #[error("message processing error: {source}")]
51    MessageProcessingError {
52        #[source]
53        source: Box<DataPathError>,
54        msg: Box<Message>,
55    },
56
57    // Configuration error
58    #[error("configuration error")]
59    ConfigurationError(#[from] ConfigError),
60
61    // Remote subscription ACK errors
62    #[error("remote subscription ack timed out after {0} retries")]
63    RemoteSubscriptionAckTimeout(u32),
64
65    #[error("remote subscription ack returned error: {0}")]
66    RemoteSubscriptionAckError(String),
67
68    // Shutdown errors
69    #[error("data path is already closed")]
70    AlreadyClosedError,
71    #[error("data plane is shutting down")]
72    ShuttingDownError,
73    #[error("timeout during shutdown")]
74    ShutdownTimeoutError,
75}
76
77#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
78pub struct MessageContext {
79    pub message_id: u32,
80    pub session_id: u32,
81    pub session_message_type: i32,
82}
83
84impl MessageContext {
85    pub fn from_msg(msg: &Message) -> Option<Self> {
86        msg.try_get_session_header().map(|header| Self {
87            message_id: header.get_message_id(),
88            session_id: header.get_session_id(),
89            session_message_type: header.session_message_type().into(),
90        })
91    }
92
93    pub fn get_session_message_type(&self) -> ProtoSessionMessageType {
94        self.session_message_type
95            .try_into()
96            .unwrap_or(ProtoSessionMessageType::Unspecified)
97    }
98}
99
100/// A unified error payload that includes an error message and optional session context.
101/// This type is used to serialize/deserialize errors sent over gRPC with consistent JSON structure.
102#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
103pub struct ErrorPayload {
104    pub error: String,
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub session_context: Option<MessageContext>,
107}
108
109impl std::fmt::Display for ErrorPayload {
110    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111        write!(f, "ErrorPayload: {}", self.error)?;
112        match &self.session_context {
113            Some(ctx) => write!(
114                f,
115                " (session_id={}, message_id={}, session_message_type={:?})",
116                ctx.session_id,
117                ctx.message_id,
118                ctx.get_session_message_type()
119            ),
120            None => Ok(()),
121        }
122    }
123}
124
125impl ErrorPayload {
126    /// Create a new error payload
127    pub fn new(error: String, session_context: Option<MessageContext>) -> Self {
128        Self {
129            error,
130            session_context,
131        }
132    }
133
134    /// Convert to JSON string for transmission
135    pub fn to_json_string(&self) -> String {
136        serde_json::to_string(self).expect("ErrorPayload should be serializable")
137    }
138
139    /// Parse from JSON string
140    pub fn from_json_str(s: &str) -> Option<Self> {
141        serde_json::from_str(s).ok()
142    }
143}