Skip to main content

zerodds_amqp_endpoint/
errors.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! AMQP-Error-Code-Mapping + Diagnostic-Description-Format.
5//!
6//! Spec-Quellen:
7//! * dds-amqp-1.0 §11.1 Encode/Decode Failures.
8//! * §11.2 Connection / Session / Link Lifecycle Failures.
9//! * §11.3 Instance-Lifecycle Failures.
10//! * §11.4 Diagnostic Description Strings (Format
11//!   `<spec-section>: <human-readable>`).
12//!
13//! Ergaenzt §7.5.1 — `ResolutionError::NoRoute` ↔
14//! `amqp:not-found`-Link-Error wenn `permit_dynamic_topics =
15//! false`.
16
17use alloc::string::String;
18use core::fmt;
19
20use crate::mapping::MappingError;
21use crate::routing::ResolutionError;
22
23/// Spec §11 — Disposition-Scope eines Errors.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum ErrorScope {
26    /// Per-Transfer Disposition `rejected`. Link bleibt offen.
27    Transfer,
28    /// Link-Error. Per Spec auch der entsprechende Detach.
29    Link,
30    /// Connection-Close mit Frame.
31    Connection,
32}
33
34/// Spec §11 — kanonische AMQP-Error-Conditions, die der Endpoint
35/// emittiert.
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum AmqpErrorCondition {
38    /// `amqp:decode-error` — XCDR2-Decode/Body-Mismatch/Type-
39    /// Collision.
40    DecodeError,
41    /// `amqp:not-implemented` — durability=unsettled-state,
42    /// unbekannter `dds:operation`-Wert, etc.
43    NotImplemented,
44    /// `amqp:not-found` — Address nicht im Catalog und
45    /// `permit_dynamic_topics = false`.
46    NotFound,
47    /// `amqp:resource-limit-exceeded` — max-connections,
48    /// catalog-cap, max-frame-size, idle-timeout.
49    ResourceLimitExceeded,
50    /// `amqp:unauthorized-access` — DDS-Security AccessControl-
51    /// Plugin lehnt ab.
52    UnauthorizedAccess,
53    /// `amqp:precondition-failed` — `dds:operation = unregister`
54    /// auf unbekannter Instanz, Reply-Validation D.4.1
55    /// `correlation-id absent`.
56    PreconditionFailed,
57    /// `amqp:connection:framing-error` — Frame-Size ueberschreitet
58    /// `max-frame-size`.
59    FramingError,
60}
61
62impl AmqpErrorCondition {
63    /// Spec-konformer Wire-String fuer das `error.condition`-Feld.
64    #[must_use]
65    pub const fn as_symbol(self) -> &'static str {
66        match self {
67            Self::DecodeError => "amqp:decode-error",
68            Self::NotImplemented => "amqp:not-implemented",
69            Self::NotFound => "amqp:not-found",
70            Self::ResourceLimitExceeded => "amqp:resource-limit-exceeded",
71            Self::UnauthorizedAccess => "amqp:unauthorized-access",
72            Self::PreconditionFailed => "amqp:precondition-failed",
73            Self::FramingError => "amqp:connection:framing-error",
74        }
75    }
76}
77
78/// Spec §11.4 — `error.description`-String mit kanonischem
79/// `<spec-section>: <human-text>`-Format.
80#[derive(Debug, Clone, PartialEq, Eq)]
81pub struct ErrorDescription {
82    /// `<spec-section>` (z.B. `§7.2.1.3` oder `Annex D §D.4.1`).
83    pub spec_section: String,
84    /// Mensch-lesbarer Text (Englisch; Spec verlangt ASCII-only
85    /// nicht, aber die OMG-Konvention ist Englisch).
86    pub message: String,
87}
88
89impl ErrorDescription {
90    /// Konstruktor.
91    pub fn new(spec_section: impl Into<String>, message: impl Into<String>) -> Self {
92        Self {
93            spec_section: spec_section.into(),
94            message: message.into(),
95        }
96    }
97
98    /// Spec §11.4 — Wire-String `<§-Ref>: <Text>`.
99    #[must_use]
100    pub fn render(&self) -> String {
101        alloc::format!("{}: {}", self.spec_section, self.message)
102    }
103}
104
105impl fmt::Display for ErrorDescription {
106    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107        write!(f, "{}: {}", self.spec_section, self.message)
108    }
109}
110
111/// Spec §11 — vollstaendige Error-Information, die der Endpoint
112/// ueber AMQP zurueckmeldet.
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub struct AmqpError {
115    /// AMQP-Wire-Error-Condition.
116    pub condition: AmqpErrorCondition,
117    /// Disposition-Scope (Transfer / Link / Connection).
118    pub scope: ErrorScope,
119    /// Diagnostic-Description fuer den `error.description`-String.
120    pub description: ErrorDescription,
121}
122
123impl AmqpError {
124    /// Konstruktor.
125    pub fn new(
126        condition: AmqpErrorCondition,
127        scope: ErrorScope,
128        description: ErrorDescription,
129    ) -> Self {
130        Self {
131            condition,
132            scope,
133            description,
134        }
135    }
136}
137
138// ============================================================
139// Spec-Mapping-Helpers
140// ============================================================
141
142/// Spec §11.1 — `MappingError` aus dem Body-Encoding-Pfad auf
143/// AMQP-Errors abbilden.
144#[must_use]
145pub fn map_mapping_error(err: &MappingError) -> AmqpError {
146    match err {
147        MappingError::InvalidUtf8 => AmqpError::new(
148            AmqpErrorCondition::DecodeError,
149            ErrorScope::Transfer,
150            ErrorDescription::new("§11.1", "JSON body is not valid UTF-8"),
151        ),
152        MappingError::InvalidJson(msg) => AmqpError::new(
153            AmqpErrorCondition::DecodeError,
154            ErrorScope::Transfer,
155            ErrorDescription::new("§11.1", alloc::format!("JSON body parse error: {msg}")),
156        ),
157        MappingError::EmptyBody => AmqpError::new(
158            AmqpErrorCondition::DecodeError,
159            ErrorScope::Transfer,
160            ErrorDescription::new("§11.1", "body section is empty"),
161        ),
162    }
163}
164
165/// Spec §7.5.1 — `ResolutionError::NoRoute` auf
166/// `amqp:not-found`-Link-Error abbilden, wenn
167/// `permit_dynamic_topics = false`.
168#[must_use]
169pub fn map_resolution_error(err: &ResolutionError, permit_dynamic_topics: bool) -> AmqpError {
170    match err {
171        ResolutionError::NoRoute(addr) => {
172            if permit_dynamic_topics {
173                // permit_dynamic_topics=true: Caller soll on-the-fly
174                // anlegen; hier melden wir es als NotFound mit
175                // Hinweis auf §7.5.1, das Caller-Verhalten regelt.
176                AmqpError::new(
177                    AmqpErrorCondition::NotFound,
178                    ErrorScope::Link,
179                    ErrorDescription::new(
180                        "§7.5.1",
181                        alloc::format!(
182                            "address '{addr}' not in catalog (dynamic-topic creation enabled)"
183                        ),
184                    ),
185                )
186            } else {
187                AmqpError::new(
188                    AmqpErrorCondition::NotFound,
189                    ErrorScope::Link,
190                    ErrorDescription::new(
191                        "§7.5.1",
192                        alloc::format!(
193                            "address '{addr}' not in catalog and permit_dynamic_topics = false"
194                        ),
195                    ),
196                )
197            }
198        }
199        ResolutionError::Malformed(addr) => AmqpError::new(
200            AmqpErrorCondition::DecodeError,
201            ErrorScope::Link,
202            ErrorDescription::new("§7.3", alloc::format!("malformed AMQP address '{addr}'")),
203        ),
204    }
205}
206
207/// Spec §11.2 — Resource-Limit-Verletzungen.
208#[must_use]
209pub fn resource_limit_exceeded(
210    spec_section: impl Into<String>,
211    message: impl Into<String>,
212) -> AmqpError {
213    AmqpError::new(
214        AmqpErrorCondition::ResourceLimitExceeded,
215        ErrorScope::Connection,
216        ErrorDescription::new(spec_section, message),
217    )
218}
219
220/// Spec §11.2 — `terminus.durable=unsettled-state`-Reject.
221#[must_use]
222pub fn unsettled_state_not_implemented() -> AmqpError {
223    AmqpError::new(
224        AmqpErrorCondition::NotImplemented,
225        ErrorScope::Link,
226        ErrorDescription::new(
227            "§7.4.2",
228            "terminus.durable = unsettled-state requires broker functionality (out of scope)",
229        ),
230    )
231}
232
233/// Spec §11.2 — Unbekannter `dds:operation`-Wert.
234#[must_use]
235pub fn unknown_dds_operation(value: &str) -> AmqpError {
236    AmqpError::new(
237        AmqpErrorCondition::NotImplemented,
238        ErrorScope::Link,
239        ErrorDescription::new(
240            "§7.7",
241            alloc::format!("unknown dds:operation value '{value}'"),
242        ),
243    )
244}
245
246/// Spec §11.3 — Instance-Lifecycle-Failure: `unregister`/`dispose`
247/// auf unbekannter Instanz.
248#[must_use]
249pub fn instance_unknown(op: &str, key: &str) -> AmqpError {
250    AmqpError::new(
251        AmqpErrorCondition::PreconditionFailed,
252        ErrorScope::Transfer,
253        ErrorDescription::new(
254            "§11.3",
255            alloc::format!("dds:operation = {op} on unknown instance key '{key}'"),
256        ),
257    )
258}
259
260/// Spec §11.3 — `register` ohne Key-Felder im Body.
261#[must_use]
262pub fn register_missing_key() -> AmqpError {
263    AmqpError::new(
264        AmqpErrorCondition::DecodeError,
265        ErrorScope::Transfer,
266        ErrorDescription::new(
267            "§11.3",
268            "dds:operation = register but body lacks key fields",
269        ),
270    )
271}
272
273/// Spec §11.2 — DDS-Security AccessControl-Reject.
274#[must_use]
275pub fn access_denied(subject: &str, address: &str) -> AmqpError {
276    AmqpError::new(
277        AmqpErrorCondition::UnauthorizedAccess,
278        ErrorScope::Link,
279        ErrorDescription::new(
280            "§10.3.3",
281            alloc::format!("subject '{subject}' denied access to '{address}'"),
282        ),
283    )
284}
285
286#[cfg(test)]
287#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
288mod tests {
289    use super::*;
290
291    // --- Symbols ---
292
293    #[test]
294    fn condition_symbols_match_spec() {
295        assert_eq!(
296            AmqpErrorCondition::DecodeError.as_symbol(),
297            "amqp:decode-error"
298        );
299        assert_eq!(
300            AmqpErrorCondition::NotImplemented.as_symbol(),
301            "amqp:not-implemented"
302        );
303        assert_eq!(AmqpErrorCondition::NotFound.as_symbol(), "amqp:not-found");
304        assert_eq!(
305            AmqpErrorCondition::ResourceLimitExceeded.as_symbol(),
306            "amqp:resource-limit-exceeded"
307        );
308        assert_eq!(
309            AmqpErrorCondition::UnauthorizedAccess.as_symbol(),
310            "amqp:unauthorized-access"
311        );
312        assert_eq!(
313            AmqpErrorCondition::PreconditionFailed.as_symbol(),
314            "amqp:precondition-failed"
315        );
316        assert_eq!(
317            AmqpErrorCondition::FramingError.as_symbol(),
318            "amqp:connection:framing-error"
319        );
320    }
321
322    // --- §11.4 Diagnostic-Format ---
323
324    #[test]
325    fn description_renders_spec_section_then_text() {
326        let d = ErrorDescription::new("§7.2.1.3", "type-id collision detected");
327        assert_eq!(d.render(), "§7.2.1.3: type-id collision detected");
328    }
329
330    #[test]
331    fn description_display_matches_render() {
332        let d = ErrorDescription::new("§D.4.1", "correlation-id absent");
333        assert_eq!(alloc::format!("{d}"), d.render());
334    }
335
336    // --- §11.1 Mapping-Error-Mapper ---
337
338    #[test]
339    fn invalid_utf8_maps_to_decode_error_transfer() {
340        let e = map_mapping_error(&MappingError::InvalidUtf8);
341        assert_eq!(e.condition, AmqpErrorCondition::DecodeError);
342        assert_eq!(e.scope, ErrorScope::Transfer);
343        assert!(e.description.spec_section.contains("§11.1"));
344    }
345
346    #[test]
347    fn invalid_json_maps_to_decode_error() {
348        let e = map_mapping_error(&MappingError::InvalidJson("bad token".into()));
349        assert_eq!(e.condition, AmqpErrorCondition::DecodeError);
350        assert!(e.description.message.contains("bad token"));
351    }
352
353    #[test]
354    fn empty_body_maps_to_decode_error() {
355        let e = map_mapping_error(&MappingError::EmptyBody);
356        assert_eq!(e.condition, AmqpErrorCondition::DecodeError);
357    }
358
359    // --- §7.5.1 NoRoute-Mapping ---
360
361    #[test]
362    fn no_route_with_dynamic_disabled_yields_not_found_link() {
363        let e = map_resolution_error(&ResolutionError::NoRoute("X".into()), false);
364        assert_eq!(e.condition, AmqpErrorCondition::NotFound);
365        assert_eq!(e.scope, ErrorScope::Link);
366        assert!(
367            e.description
368                .message
369                .contains("permit_dynamic_topics = false")
370        );
371        assert!(e.description.spec_section.contains("§7.5.1"));
372    }
373
374    #[test]
375    fn no_route_with_dynamic_enabled_still_not_found() {
376        // Spec §7.5.1 — `true`-Pfad wird Caller-seitig in
377        // dynamic-topic-creation umgesetzt; wir liefern hier
378        // dieselbe Condition aber andere Description.
379        let e = map_resolution_error(&ResolutionError::NoRoute("X".into()), true);
380        assert_eq!(e.condition, AmqpErrorCondition::NotFound);
381        assert!(
382            e.description
383                .message
384                .contains("dynamic-topic creation enabled")
385        );
386    }
387
388    #[test]
389    fn malformed_address_maps_to_decode_error() {
390        let e = map_resolution_error(&ResolutionError::Malformed("bad://".into()), false);
391        assert_eq!(e.condition, AmqpErrorCondition::DecodeError);
392        assert_eq!(e.scope, ErrorScope::Link);
393    }
394
395    // --- §11.2 Resource-Limits + Durability ---
396
397    #[test]
398    fn resource_limit_exceeded_is_connection_scope() {
399        let e = resource_limit_exceeded("§7.10", "max-connections cap reached");
400        assert_eq!(e.condition, AmqpErrorCondition::ResourceLimitExceeded);
401        assert_eq!(e.scope, ErrorScope::Connection);
402    }
403
404    #[test]
405    fn unsettled_state_yields_not_implemented() {
406        let e = unsettled_state_not_implemented();
407        assert_eq!(e.condition, AmqpErrorCondition::NotImplemented);
408        assert_eq!(e.scope, ErrorScope::Link);
409        assert!(e.description.spec_section.contains("§7.4.2"));
410    }
411
412    #[test]
413    fn unknown_dds_operation_yields_not_implemented() {
414        let e = unknown_dds_operation("teleport");
415        assert_eq!(e.condition, AmqpErrorCondition::NotImplemented);
416        assert!(e.description.message.contains("teleport"));
417    }
418
419    // --- §11.3 Instance-Lifecycle ---
420
421    #[test]
422    fn instance_unknown_yields_precondition_failed() {
423        let e = instance_unknown("unregister", "key-7");
424        assert_eq!(e.condition, AmqpErrorCondition::PreconditionFailed);
425        assert_eq!(e.scope, ErrorScope::Transfer);
426        assert!(e.description.message.contains("key-7"));
427        assert!(e.description.message.contains("unregister"));
428    }
429
430    #[test]
431    fn register_missing_key_yields_decode_error() {
432        let e = register_missing_key();
433        assert_eq!(e.condition, AmqpErrorCondition::DecodeError);
434        assert!(e.description.message.contains("register"));
435    }
436
437    // --- §10.3.3 AccessControl-Reject ---
438
439    #[test]
440    fn access_denied_yields_unauthorized_access_link() {
441        let e = access_denied("CN=eve", "Sensor");
442        assert_eq!(e.condition, AmqpErrorCondition::UnauthorizedAccess);
443        assert_eq!(e.scope, ErrorScope::Link);
444        assert!(e.description.message.contains("CN=eve"));
445        assert!(e.description.message.contains("Sensor"));
446    }
447}