Skip to main content

apollo_federation/connectors/runtime/
errors.rs

1use serde::Serialize;
2use serde_json_bytes::ByteString;
3use serde_json_bytes::Map;
4use serde_json_bytes::Value;
5
6use crate::connectors::Connector;
7use crate::connectors::runtime::key::ResponseKey;
8
9#[derive(Clone, Debug, Serialize)]
10pub struct RuntimeError {
11    pub message: String,
12    code: Option<String>,
13    pub coordinate: Option<String>,
14    pub subgraph_name: Option<String>,
15    pub path: String,
16    pub extensions: Map<ByteString, Value>,
17    /// Tracks whether a span event has already been emitted for this error at its source site
18    /// (e.g. in `process_response`). When set, the centralized catch-all in `count_operation_errors`
19    /// skips re-emitting so traces carry exactly one event per error.
20    #[serde(skip)]
21    span_event_emitted: bool,
22}
23
24impl RuntimeError {
25    pub fn new(message: impl Into<String>, response_key: &ResponseKey) -> Self {
26        Self {
27            message: message.into(),
28            code: None,
29            coordinate: None,
30            subgraph_name: None,
31            path: response_key.path_string(),
32            extensions: Default::default(),
33            span_event_emitted: false,
34        }
35    }
36
37    pub fn extensions(&self) -> Map<ByteString, Value> {
38        let mut extensions = Map::default();
39        extensions
40            .entry("code")
41            .or_insert_with(|| self.code().into());
42        if let Some(subgraph_name) = &self.subgraph_name {
43            extensions
44                .entry("service")
45                .or_insert_with(|| Value::String(subgraph_name.clone().into()));
46        };
47
48        if let Some(coordinate) = &self.coordinate {
49            extensions.entry("connector").or_insert_with(|| {
50                Value::Object(Map::from_iter([(
51                    "coordinate".into(),
52                    Value::String(coordinate.to_string().into()),
53                )]))
54            });
55        }
56
57        extensions.extend(self.extensions.clone());
58        extensions
59    }
60
61    pub fn extension<K, V>(mut self, key: K, value: V) -> Self
62    where
63        K: Into<ByteString>,
64        V: Into<Value>,
65    {
66        self.extensions.insert(key.into(), value.into());
67        self
68    }
69
70    /// Like [`Self::extension`], but if the existing value at `key` and the
71    /// incoming value are both objects, their fields are merged recursively
72    /// rather than the existing value being replaced wholesale. Used so a
73    /// user-supplied `errors.extensions: "http: { myField: ... }"` mapping
74    /// preserves the default `http.status` field set by `map_error`. See
75    /// the errors-as-data docs at
76    /// <https://www.apollographql.com/docs/graphos/connectors/responses/error-handling>
77    /// for the contract that defaults are retained alongside user fields.
78    pub fn merge_extension<K, V>(mut self, key: K, value: V) -> Self
79    where
80        K: Into<ByteString>,
81        V: Into<Value>,
82    {
83        let key = key.into();
84        let value = value.into();
85        // Deep-merge in place when both sides are objects, otherwise overwrite
86        // in place. Both paths preserve the existing key's position in the
87        // insertion-ordered map so user-visible ordering doesn't shift just
88        // because we touched the value.
89        match self.extensions.get_mut(&key) {
90            Some(existing) => match (existing, value) {
91                (Value::Object(existing), Value::Object(incoming)) => {
92                    deep_merge_objects(existing, incoming);
93                }
94                (slot, incoming) => {
95                    *slot = incoming;
96                }
97            },
98            None => {
99                self.extensions.insert(key, value);
100            }
101        }
102        self
103    }
104
105    pub fn with_code(mut self, code: impl Into<String>) -> Self {
106        self.code = Some(code.into());
107        self
108    }
109
110    pub fn code(&self) -> &str {
111        self.code.as_deref().unwrap_or("CONNECTORS_FETCH")
112    }
113
114    pub fn span_event_emitted(&self) -> bool {
115        self.span_event_emitted
116    }
117
118    pub fn set_span_event_emitted(&mut self, value: bool) {
119        self.span_event_emitted = value;
120    }
121}
122
123/// Merge `src` into `dest` recursively: when both sides have an object at the
124/// same key, descend; otherwise the value from `src` wins. Used by
125/// [`RuntimeError::merge_extension`].
126fn deep_merge_objects(dest: &mut Map<ByteString, Value>, src: Map<ByteString, Value>) {
127    for (key, value) in src {
128        match dest.get_mut(&key) {
129            Some(existing) => match (existing, value) {
130                (Value::Object(existing), Value::Object(incoming)) => {
131                    deep_merge_objects(existing, incoming);
132                }
133                (slot, incoming) => {
134                    *slot = incoming;
135                }
136            },
137            None => {
138                dest.insert(key, value);
139            }
140        }
141    }
142}
143
144/// An error sending a connector request. This represents a problem with sending the request
145/// to the connector, rather than an error returned from the connector itself.
146#[derive(Debug, Clone, thiserror::Error)]
147pub enum Error {
148    #[error("Request limit exceeded")]
149    RequestLimitExceeded,
150
151    #[error("Rate limit exceeded")]
152    RateLimited,
153
154    #[error("Gateway timeout")]
155    GatewayTimeout,
156
157    #[error("Connector error: {0}")]
158    TransportFailure(String),
159}
160
161impl Error {
162    pub fn to_runtime_error(
163        &self,
164        connector: &Connector,
165        response_key: &ResponseKey,
166    ) -> RuntimeError {
167        RuntimeError {
168            message: self.to_string(),
169            code: Some(self.code().to_string()),
170            coordinate: Some(connector.id.coordinate()),
171            subgraph_name: Some(connector.id.subgraph_name.clone()),
172            path: response_key.path_string(),
173            extensions: Default::default(),
174            span_event_emitted: false,
175        }
176    }
177
178    pub fn code(&self) -> &'static str {
179        match self {
180            Self::RequestLimitExceeded => "REQUEST_LIMIT_EXCEEDED",
181            Self::RateLimited => "REQUEST_RATE_LIMITED",
182            Self::GatewayTimeout => "GATEWAY_TIMEOUT",
183            Self::TransportFailure(_) => "HTTP_CLIENT_ERROR",
184        }
185    }
186}