1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
//! Unified SDK error type.
use thiserror::Error;
/// Unified SDK error.
///
/// Marked `#[non_exhaustive]` so adding a new variant is a minor-version
/// change — external `match` statements must include a wildcard arm.
/// The most recent variant additions (`Sampled`, `Unrouted`) tightened
/// `From<IngestionError>` so structured backpressure / sampling /
/// no-route signals stop being funnelled into a stringly-typed
/// `Ingestion(String)`. Callers that previously matched on the
/// string content of `Ingestion` need to be updated to match the
/// new variants.
#[derive(Error, Debug)]
#[non_exhaustive]
pub enum SdkError {
#[error("node has been shut down")]
Shutdown,
/// Generic ingestion failure that doesn't map to a more
/// specific variant.
///
/// Pre-fix, every `IngestionError` was funnelled here
/// — `Backpressure`, `Sampled`, `Unrouted`, and shutdown all
/// became `Ingestion("…")` and callers had to string-match to
/// pick a remediation. Today's `From<IngestionError>` impl
/// routes the structured variants below; this `String` variant
/// stays as a fallback for any future `IngestionError`
/// addition and for callers that already pattern-match on it.
#[error("ingestion failed: {0}")]
Ingestion(String),
/// Event was deliberately dropped by a sampling / decimation
/// policy. Retrying is pointless — the producer should accept
/// the drop or change the sampling rate.
#[error("event dropped due to sampling")]
Sampled,
/// No routable shard for the event. Typically a topology-
/// transient state (a concurrent scale-down removed the
/// hashed shard id, or the shard is still provisioning).
/// Retry once topology stabilizes; back-off-and-retry on
/// `Backpressure` semantics is the wrong remediation.
#[error("event has no routable shard")]
Unrouted,
#[error("poll failed: {0}")]
Poll(String),
#[error("adapter error: {0}")]
Adapter(String),
#[error("serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("invalid configuration: {0}")]
Config(String),
#[error("mesh transport not available")]
NoMesh,
/// Stream's per-stream in-flight window is full. The caller's events
/// were NOT sent — daemons decide whether to drop, retry, or buffer
/// at the app layer. See `Mesh::send_with_retry` / `send_blocking`
/// for the two built-in policies.
#[error("stream backpressure: queue full")]
Backpressure,
/// Stream's peer session is gone (peer disconnected, never
/// connected, or the stream was closed).
#[error("stream not connected")]
NotConnected,
/// A publisher's `Ack` rejected a Subscribe / Unsubscribe
/// request. `None` means the rejection arrived without a
/// structured reason. Gated behind `net` because
/// `AckReason` lives in the network-transport surface;
/// non-`net` SDK builds (e.g. redis-only consumer helpers)
/// don't compile this variant.
#[cfg(feature = "net")]
#[error("channel membership rejected: {0:?}")]
ChannelRejected(Option<net::adapter::net::AckReason>),
/// NAT-traversal failure — a reflex probe, hole-punch, or
/// port-mapping path couldn't complete. Always represents a
/// missed *optimization*, never a connectivity failure:
/// every NATed peer remains reachable through the routed-
/// handshake path. `kind` is a stable discriminator
/// matching the `nat-traversal` crate's error vocabulary
/// (`reflex-timeout` / `peer-not-reachable` / `transport` /
/// `rendezvous-no-relay` / `rendezvous-rejected` /
/// `punch-failed` / `port-map-unavailable` / `unsupported`).
#[cfg(feature = "nat-traversal")]
#[error("traversal {kind}: {message}")]
Traversal {
/// Stable machine-readable discriminator — same value as
/// `TraversalError::kind()`. Never localized; never
/// changed once a variant has shipped.
kind: &'static str,
/// Human-readable detail (e.g. the underlying socket
/// error on a `transport` failure). Empty for kinds
/// that have no variable detail.
message: String,
},
}
#[cfg(feature = "net")]
impl From<net::adapter::net::StreamError> for SdkError {
fn from(e: net::adapter::net::StreamError) -> Self {
use net::adapter::net::StreamError;
match e {
StreamError::Backpressure => SdkError::Backpressure,
StreamError::NotConnected => SdkError::NotConnected,
StreamError::Transport(msg) => SdkError::Adapter(msg),
}
}
}
impl From<net::error::IngestionError> for SdkError {
fn from(e: net::error::IngestionError) -> Self {
use net::error::IngestionError;
// Pre-fix this stringified every variant into
// `SdkError::Ingestion(String)`, forcing callers to match
// on the message text to distinguish "ring buffer full,
// retry with backoff" (Backpressure) from "event dropped
// by sampling, retry futile" (Sampled) from "no routable
// shard, retry once topology stabilizes" (Unrouted).
// Each maps to a structured SdkError variant so the
// remediation choice is encoded in the type system.
match e {
IngestionError::Backpressure => SdkError::Backpressure,
IngestionError::Sampled => SdkError::Sampled,
IngestionError::Unrouted => SdkError::Unrouted,
IngestionError::ShuttingDown => SdkError::Shutdown,
IngestionError::Serialization(err) => SdkError::Serialization(err),
}
}
}
impl From<net::error::ConsumerError> for SdkError {
fn from(e: net::error::ConsumerError) -> Self {
SdkError::Poll(e.to_string())
}
}
impl From<net::error::AdapterError> for SdkError {
fn from(e: net::error::AdapterError) -> Self {
SdkError::Adapter(e.to_string())
}
}
#[cfg(feature = "nat-traversal")]
impl From<net::adapter::net::traversal::TraversalError> for SdkError {
fn from(e: net::adapter::net::traversal::TraversalError) -> Self {
SdkError::Traversal {
kind: e.kind(),
message: e.to_string(),
}
}
}
pub type Result<T> = std::result::Result<T, SdkError>;
#[cfg(test)]
mod tests {
use super::*;
use net::error::IngestionError;
/// Each `IngestionError` variant must map to a
/// structured `SdkError` so callers don't have to string-
/// match the message text to pick a remediation.
#[test]
fn ingestion_backpressure_maps_to_structured_backpressure() {
let sdk: SdkError = IngestionError::Backpressure.into();
assert!(
matches!(sdk, SdkError::Backpressure),
"Backpressure must map to SdkError::Backpressure, got {:?}",
sdk
);
}
#[test]
fn ingestion_sampled_maps_to_structured_sampled() {
let sdk: SdkError = IngestionError::Sampled.into();
assert!(
matches!(sdk, SdkError::Sampled),
"Sampled must map to SdkError::Sampled so callers \
know retry is pointless; got {:?}",
sdk
);
}
#[test]
fn ingestion_unrouted_maps_to_structured_unrouted() {
let sdk: SdkError = IngestionError::Unrouted.into();
assert!(
matches!(sdk, SdkError::Unrouted),
"Unrouted must map to SdkError::Unrouted so callers \
know to wait for topology to stabilize; got {:?}",
sdk
);
}
#[test]
fn ingestion_shutdown_maps_to_structured_shutdown() {
let sdk: SdkError = IngestionError::ShuttingDown.into();
assert!(
matches!(sdk, SdkError::Shutdown),
"ShuttingDown must reuse SdkError::Shutdown, got {:?}",
sdk
);
}
#[test]
fn ingestion_serialization_preserves_structured_serialization() {
// Construct an IngestionError::Serialization carrying a
// real serde_json error so the From conversion has
// something to unwrap.
let parse_err: serde_json::Error =
serde_json::from_str::<serde_json::Value>("{ this is not json").unwrap_err();
let sdk: SdkError = IngestionError::Serialization(parse_err).into();
assert!(
matches!(sdk, SdkError::Serialization(_)),
"Serialization must keep its #[from] serde_json::Error, got {:?}",
sdk
);
}
}