apollo-federation 2.16.0

Apollo Federation
Documentation
use serde::Serialize;
use serde_json_bytes::ByteString;
use serde_json_bytes::Map;
use serde_json_bytes::Value;

use crate::connectors::Connector;
use crate::connectors::runtime::key::ResponseKey;

#[derive(Clone, Debug, Serialize)]
pub struct RuntimeError {
    pub message: String,
    code: Option<String>,
    pub coordinate: Option<String>,
    pub subgraph_name: Option<String>,
    pub path: String,
    pub extensions: Map<ByteString, Value>,
    /// Tracks whether a span event has already been emitted for this error at its source site
    /// (e.g. in `process_response`). When set, the centralized catch-all in `count_operation_errors`
    /// skips re-emitting so traces carry exactly one event per error.
    #[serde(skip)]
    span_event_emitted: bool,
}

impl RuntimeError {
    pub fn new(message: impl Into<String>, response_key: &ResponseKey) -> Self {
        Self {
            message: message.into(),
            code: None,
            coordinate: None,
            subgraph_name: None,
            path: response_key.path_string(),
            extensions: Default::default(),
            span_event_emitted: false,
        }
    }

    pub fn extensions(&self) -> Map<ByteString, Value> {
        let mut extensions = Map::default();
        extensions
            .entry("code")
            .or_insert_with(|| self.code().into());
        if let Some(subgraph_name) = &self.subgraph_name {
            extensions
                .entry("service")
                .or_insert_with(|| Value::String(subgraph_name.clone().into()));
        };

        if let Some(coordinate) = &self.coordinate {
            extensions.entry("connector").or_insert_with(|| {
                Value::Object(Map::from_iter([(
                    "coordinate".into(),
                    Value::String(coordinate.to_string().into()),
                )]))
            });
        }

        extensions.extend(self.extensions.clone());
        extensions
    }

    pub fn extension<K, V>(mut self, key: K, value: V) -> Self
    where
        K: Into<ByteString>,
        V: Into<Value>,
    {
        self.extensions.insert(key.into(), value.into());
        self
    }

    /// Like [`Self::extension`], but if the existing value at `key` and the
    /// incoming value are both objects, their fields are merged recursively
    /// rather than the existing value being replaced wholesale. Used so a
    /// user-supplied `errors.extensions: "http: { myField: ... }"` mapping
    /// preserves the default `http.status` field set by `map_error`. See
    /// the errors-as-data docs at
    /// <https://www.apollographql.com/docs/graphos/connectors/responses/error-handling>
    /// for the contract that defaults are retained alongside user fields.
    pub fn merge_extension<K, V>(mut self, key: K, value: V) -> Self
    where
        K: Into<ByteString>,
        V: Into<Value>,
    {
        let key = key.into();
        let value = value.into();
        // Deep-merge in place when both sides are objects, otherwise overwrite
        // in place. Both paths preserve the existing key's position in the
        // insertion-ordered map so user-visible ordering doesn't shift just
        // because we touched the value.
        match self.extensions.get_mut(&key) {
            Some(existing) => match (existing, value) {
                (Value::Object(existing), Value::Object(incoming)) => {
                    deep_merge_objects(existing, incoming);
                }
                (slot, incoming) => {
                    *slot = incoming;
                }
            },
            None => {
                self.extensions.insert(key, value);
            }
        }
        self
    }

    pub fn with_code(mut self, code: impl Into<String>) -> Self {
        self.code = Some(code.into());
        self
    }

    pub fn code(&self) -> &str {
        self.code.as_deref().unwrap_or("CONNECTORS_FETCH")
    }

    pub fn span_event_emitted(&self) -> bool {
        self.span_event_emitted
    }

    pub fn set_span_event_emitted(&mut self, value: bool) {
        self.span_event_emitted = value;
    }
}

/// Merge `src` into `dest` recursively: when both sides have an object at the
/// same key, descend; otherwise the value from `src` wins. Used by
/// [`RuntimeError::merge_extension`].
fn deep_merge_objects(dest: &mut Map<ByteString, Value>, src: Map<ByteString, Value>) {
    for (key, value) in src {
        match dest.get_mut(&key) {
            Some(existing) => match (existing, value) {
                (Value::Object(existing), Value::Object(incoming)) => {
                    deep_merge_objects(existing, incoming);
                }
                (slot, incoming) => {
                    *slot = incoming;
                }
            },
            None => {
                dest.insert(key, value);
            }
        }
    }
}

/// An error sending a connector request. This represents a problem with sending the request
/// to the connector, rather than an error returned from the connector itself.
#[derive(Debug, Clone, thiserror::Error)]
pub enum Error {
    #[error("Request limit exceeded")]
    RequestLimitExceeded,

    #[error("Rate limit exceeded")]
    RateLimited,

    #[error("Gateway timeout")]
    GatewayTimeout,

    #[error("Connector error: {0}")]
    TransportFailure(String),
}

impl Error {
    pub fn to_runtime_error(
        &self,
        connector: &Connector,
        response_key: &ResponseKey,
    ) -> RuntimeError {
        RuntimeError {
            message: self.to_string(),
            code: Some(self.code().to_string()),
            coordinate: Some(connector.id.coordinate()),
            subgraph_name: Some(connector.id.subgraph_name.clone()),
            path: response_key.path_string(),
            extensions: Default::default(),
            span_event_emitted: false,
        }
    }

    pub fn code(&self) -> &'static str {
        match self {
            Self::RequestLimitExceeded => "REQUEST_LIMIT_EXCEEDED",
            Self::RateLimited => "REQUEST_RATE_LIMITED",
            Self::GatewayTimeout => "GATEWAY_TIMEOUT",
            Self::TransportFailure(_) => "HTTP_CLIENT_ERROR",
        }
    }
}