1use crate::api::ProtoSessionMessageType;
5use crate::api::proto::dataplane::v1::Message;
6use crate::messages::{Name, utils::MessageError};
7use slim_config::grpc::errors::ConfigError;
8use thiserror::Error;
9
10#[derive(Error, Debug)]
12pub enum DataPathError {
13 #[error("connection error")]
15 ConnectionError,
16 #[error("disconnection error")]
17 DisconnectionError(u64),
18 #[error("grpc error")]
19 GrpcError(#[from] tonic::Status),
20
21 #[error("unknown message type")]
23 UnknownMsgType,
24 #[error("invalid message: {0}")]
25 InvalidMessage(MessageError),
26
27 #[error("no matching found for {0}")]
29 NoMatch(Name),
30 #[error("subscription not found")]
31 SubscriptionNotFound(Name),
32 #[error("subscription id not found: {0}")]
33 SubscriptionIdNotFound(u64),
34 #[error("id not found: {0}")]
35 IdNotFound(u64),
36
37 #[error("connection not found: {0}")]
39 ConnectionNotFound(u64),
40 #[error("connection id not found: {0}")]
41 ConnectionIdNotFound(u64),
42
43 #[error("malformed message")]
45 MalformedMessage(#[from] MessageError),
46 #[error("message processing error: {0}")]
47 ProcessingError(MessageError),
48 #[error("error adding connection to connection table")]
49 ConnectionTableAddError,
50 #[error("message processing error: {source}")]
51 MessageProcessingError {
52 #[source]
53 source: Box<DataPathError>,
54 msg: Box<Message>,
55 },
56
57 #[error("configuration error")]
59 ConfigurationError(#[from] ConfigError),
60
61 #[error("remote subscription ack timed out after {0} retries")]
63 RemoteSubscriptionAckTimeout(u32),
64
65 #[error("remote subscription ack returned error: {0}")]
66 RemoteSubscriptionAckError(String),
67
68 #[error("data path is already closed")]
70 AlreadyClosedError,
71 #[error("data plane is shutting down")]
72 ShuttingDownError,
73 #[error("timeout during shutdown")]
74 ShutdownTimeoutError,
75}
76
77#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
78pub struct MessageContext {
79 pub message_id: u32,
80 pub session_id: u32,
81 pub session_message_type: i32,
82}
83
84impl MessageContext {
85 pub fn from_msg(msg: &Message) -> Option<Self> {
86 msg.try_get_session_header().map(|header| Self {
87 message_id: header.get_message_id(),
88 session_id: header.get_session_id(),
89 session_message_type: header.session_message_type().into(),
90 })
91 }
92
93 pub fn get_session_message_type(&self) -> ProtoSessionMessageType {
94 self.session_message_type
95 .try_into()
96 .unwrap_or(ProtoSessionMessageType::Unspecified)
97 }
98}
99
100#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
103pub struct ErrorPayload {
104 pub error: String,
105 #[serde(skip_serializing_if = "Option::is_none")]
106 pub session_context: Option<MessageContext>,
107}
108
109impl std::fmt::Display for ErrorPayload {
110 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111 write!(f, "ErrorPayload: {}", self.error)?;
112 match &self.session_context {
113 Some(ctx) => write!(
114 f,
115 " (session_id={}, message_id={}, session_message_type={:?})",
116 ctx.session_id,
117 ctx.message_id,
118 ctx.get_session_message_type()
119 ),
120 None => Ok(()),
121 }
122 }
123}
124
125impl ErrorPayload {
126 pub fn new(error: String, session_context: Option<MessageContext>) -> Self {
128 Self {
129 error,
130 session_context,
131 }
132 }
133
134 pub fn to_json_string(&self) -> String {
136 serde_json::to_string(self).expect("ErrorPayload should be serializable")
137 }
138
139 pub fn from_json_str(s: &str) -> Option<Self> {
141 serde_json::from_str(s).ok()
142 }
143}