Skip to main content

net_sdk/
error.rs

1//! Unified SDK error type.
2
3use thiserror::Error;
4
5/// Unified SDK error.
6///
7/// Marked `#[non_exhaustive]` so adding a new variant is a minor-version
8/// change — external `match` statements must include a wildcard arm.
9/// The most recent variant additions (`Sampled`, `Unrouted`) tightened
10/// `From<IngestionError>` so structured backpressure / sampling /
11/// no-route signals stop being funnelled into a stringly-typed
12/// `Ingestion(String)`. Callers that previously matched on the
13/// string content of `Ingestion` need to be updated to match the
14/// new variants.
15#[derive(Error, Debug)]
16#[non_exhaustive]
17pub enum SdkError {
18    #[error("node has been shut down")]
19    Shutdown,
20
21    /// Generic ingestion failure that doesn't map to a more
22    /// specific variant.
23    ///
24    /// Pre-fix, every `IngestionError` was funnelled here
25    /// — `Backpressure`, `Sampled`, `Unrouted`, and shutdown all
26    /// became `Ingestion("…")` and callers had to string-match to
27    /// pick a remediation. Today's `From<IngestionError>` impl
28    /// routes the structured variants below; this `String` variant
29    /// stays as a fallback for any future `IngestionError`
30    /// addition and for callers that already pattern-match on it.
31    #[error("ingestion failed: {0}")]
32    Ingestion(String),
33
34    /// Event was deliberately dropped by a sampling / decimation
35    /// policy. Retrying is pointless — the producer should accept
36    /// the drop or change the sampling rate.
37    #[error("event dropped due to sampling")]
38    Sampled,
39
40    /// No routable shard for the event. Typically a topology-
41    /// transient state (a concurrent scale-down removed the
42    /// hashed shard id, or the shard is still provisioning).
43    /// Retry once topology stabilizes; back-off-and-retry on
44    /// `Backpressure` semantics is the wrong remediation.
45    #[error("event has no routable shard")]
46    Unrouted,
47
48    #[error("poll failed: {0}")]
49    Poll(String),
50
51    #[error("adapter error: {0}")]
52    Adapter(String),
53
54    #[error("serialization error: {0}")]
55    Serialization(#[from] serde_json::Error),
56
57    #[error("invalid configuration: {0}")]
58    Config(String),
59
60    #[error("mesh transport not available")]
61    NoMesh,
62
63    /// Stream's per-stream in-flight window is full. The caller's events
64    /// were NOT sent — daemons decide whether to drop, retry, or buffer
65    /// at the app layer. See `Mesh::send_with_retry` / `send_blocking`
66    /// for the two built-in policies.
67    #[error("stream backpressure: queue full")]
68    Backpressure,
69
70    /// Stream's peer session is gone (peer disconnected, never
71    /// connected, or the stream was closed).
72    #[error("stream not connected")]
73    NotConnected,
74
75    /// A publisher's `Ack` rejected a Subscribe / Unsubscribe
76    /// request. `None` means the rejection arrived without a
77    /// structured reason. Gated behind `net` because
78    /// `AckReason` lives in the network-transport surface;
79    /// non-`net` SDK builds (e.g. redis-only consumer helpers)
80    /// don't compile this variant.
81    #[cfg(feature = "net")]
82    #[error("channel membership rejected: {0:?}")]
83    ChannelRejected(Option<net::adapter::net::AckReason>),
84
85    /// NAT-traversal failure — a reflex probe, hole-punch, or
86    /// port-mapping path couldn't complete. Always represents a
87    /// missed *optimization*, never a connectivity failure:
88    /// every NATed peer remains reachable through the routed-
89    /// handshake path. `kind` is a stable discriminator
90    /// matching the `nat-traversal` crate's error vocabulary
91    /// (`reflex-timeout` / `peer-not-reachable` / `transport` /
92    /// `rendezvous-no-relay` / `rendezvous-rejected` /
93    /// `punch-failed` / `port-map-unavailable` / `unsupported`).
94    #[cfg(feature = "nat-traversal")]
95    #[error("traversal {kind}: {message}")]
96    Traversal {
97        /// Stable machine-readable discriminator — same value as
98        /// `TraversalError::kind()`. Never localized; never
99        /// changed once a variant has shipped.
100        kind: &'static str,
101        /// Human-readable detail (e.g. the underlying socket
102        /// error on a `transport` failure). Empty for kinds
103        /// that have no variable detail.
104        message: String,
105    },
106}
107
108#[cfg(feature = "net")]
109impl From<net::adapter::net::StreamError> for SdkError {
110    fn from(e: net::adapter::net::StreamError) -> Self {
111        use net::adapter::net::StreamError;
112        match e {
113            StreamError::Backpressure => SdkError::Backpressure,
114            StreamError::NotConnected => SdkError::NotConnected,
115            StreamError::Transport(msg) => SdkError::Adapter(msg),
116        }
117    }
118}
119
120impl From<net::error::IngestionError> for SdkError {
121    fn from(e: net::error::IngestionError) -> Self {
122        use net::error::IngestionError;
123        // Pre-fix this stringified every variant into
124        // `SdkError::Ingestion(String)`, forcing callers to match
125        // on the message text to distinguish "ring buffer full,
126        // retry with backoff" (Backpressure) from "event dropped
127        // by sampling, retry futile" (Sampled) from "no routable
128        // shard, retry once topology stabilizes" (Unrouted).
129        // Each maps to a structured SdkError variant so the
130        // remediation choice is encoded in the type system.
131        match e {
132            IngestionError::Backpressure => SdkError::Backpressure,
133            IngestionError::Sampled => SdkError::Sampled,
134            IngestionError::Unrouted => SdkError::Unrouted,
135            IngestionError::ShuttingDown => SdkError::Shutdown,
136            IngestionError::Serialization(err) => SdkError::Serialization(err),
137        }
138    }
139}
140
141impl From<net::error::ConsumerError> for SdkError {
142    fn from(e: net::error::ConsumerError) -> Self {
143        SdkError::Poll(e.to_string())
144    }
145}
146
147impl From<net::error::AdapterError> for SdkError {
148    fn from(e: net::error::AdapterError) -> Self {
149        SdkError::Adapter(e.to_string())
150    }
151}
152
153#[cfg(feature = "nat-traversal")]
154impl From<net::adapter::net::traversal::TraversalError> for SdkError {
155    fn from(e: net::adapter::net::traversal::TraversalError) -> Self {
156        SdkError::Traversal {
157            kind: e.kind(),
158            message: e.to_string(),
159        }
160    }
161}
162
163pub type Result<T> = std::result::Result<T, SdkError>;
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168    use net::error::IngestionError;
169
170    /// Each `IngestionError` variant must map to a
171    /// structured `SdkError` so callers don't have to string-
172    /// match the message text to pick a remediation.
173    #[test]
174    fn ingestion_backpressure_maps_to_structured_backpressure() {
175        let sdk: SdkError = IngestionError::Backpressure.into();
176        assert!(
177            matches!(sdk, SdkError::Backpressure),
178            "Backpressure must map to SdkError::Backpressure, got {:?}",
179            sdk
180        );
181    }
182
183    #[test]
184    fn ingestion_sampled_maps_to_structured_sampled() {
185        let sdk: SdkError = IngestionError::Sampled.into();
186        assert!(
187            matches!(sdk, SdkError::Sampled),
188            "Sampled must map to SdkError::Sampled so callers \
189             know retry is pointless; got {:?}",
190            sdk
191        );
192    }
193
194    #[test]
195    fn ingestion_unrouted_maps_to_structured_unrouted() {
196        let sdk: SdkError = IngestionError::Unrouted.into();
197        assert!(
198            matches!(sdk, SdkError::Unrouted),
199            "Unrouted must map to SdkError::Unrouted so callers \
200             know to wait for topology to stabilize; got {:?}",
201            sdk
202        );
203    }
204
205    #[test]
206    fn ingestion_shutdown_maps_to_structured_shutdown() {
207        let sdk: SdkError = IngestionError::ShuttingDown.into();
208        assert!(
209            matches!(sdk, SdkError::Shutdown),
210            "ShuttingDown must reuse SdkError::Shutdown, got {:?}",
211            sdk
212        );
213    }
214
215    #[test]
216    fn ingestion_serialization_preserves_structured_serialization() {
217        // Construct an IngestionError::Serialization carrying a
218        // real serde_json error so the From conversion has
219        // something to unwrap.
220        let parse_err: serde_json::Error =
221            serde_json::from_str::<serde_json::Value>("{ this is not json").unwrap_err();
222        let sdk: SdkError = IngestionError::Serialization(parse_err).into();
223        assert!(
224            matches!(sdk, SdkError::Serialization(_)),
225            "Serialization must keep its #[from] serde_json::Error, got {:?}",
226            sdk
227        );
228    }
229}