Skip to main content

apollo_router/query_planner/
subscription.rs

1use std::sync::Arc;
2use std::sync::atomic::AtomicUsize;
3
4use apollo_federation::query_plan::serializable_document::SerializableDocument;
5use serde::Deserialize;
6use serde::Serialize;
7use tokio::sync::broadcast;
8
9use super::OperationKind;
10use super::fetch::SubgraphSchemas;
11use super::rewrites;
12use crate::error::ValidationErrors;
13use crate::plugins::subscription::SubscriptionTaskParams;
14
15pub(crate) const SUBSCRIPTION_EVENT_SPAN_NAME: &str = "subscription_event";
16pub(crate) static OPENED_SUBSCRIPTIONS: AtomicUsize = AtomicUsize::new(0);
17pub(crate) struct SubscriptionHandle {
18    pub(crate) closed_signal: broadcast::Receiver<()>,
19    pub(crate) subscription_conf_tx: Option<tokio::sync::mpsc::Sender<SubscriptionTaskParams>>,
20}
21
22impl Clone for SubscriptionHandle {
23    fn clone(&self) -> Self {
24        Self {
25            closed_signal: self.closed_signal.resubscribe(),
26            subscription_conf_tx: self.subscription_conf_tx.clone(),
27        }
28    }
29}
30
31impl SubscriptionHandle {
32    pub(crate) fn new(
33        closed_signal: broadcast::Receiver<()>,
34        subscription_conf_tx: Option<tokio::sync::mpsc::Sender<SubscriptionTaskParams>>,
35    ) -> Self {
36        Self {
37            closed_signal,
38            subscription_conf_tx,
39        }
40    }
41}
42
43#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
44#[serde(rename_all = "camelCase")]
45pub(crate) struct SubscriptionNode {
46    /// The name of the service or subgraph that the subscription is querying.
47    pub(crate) service_name: Arc<str>,
48
49    /// The variables that are used for the subgraph subscription.
50    pub(crate) variable_usages: Vec<Arc<str>>,
51
52    /// The GraphQL subquery that is used for the subscription.
53    pub(crate) operation: SerializableDocument,
54
55    /// The GraphQL subquery operation name.
56    pub(crate) operation_name: Option<Arc<str>>,
57
58    /// The GraphQL operation kind that is used for the fetch.
59    pub(crate) operation_kind: OperationKind,
60
61    // Optionally describes a number of "rewrites" that query plan executors should apply to the data that is sent as input of this subscription.
62    pub(crate) input_rewrites: Option<Vec<rewrites::DataRewrite>>,
63
64    // Optionally describes a number of "rewrites" to apply to the data that received from a subscription (and before it is applied to the current in-memory results).
65    pub(crate) output_rewrites: Option<Vec<rewrites::DataRewrite>>,
66}
67
68impl SubscriptionNode {
69    pub(crate) fn init_parsed_operation(
70        &mut self,
71        subgraph_schemas: &SubgraphSchemas,
72    ) -> Result<(), ValidationErrors> {
73        let schema = &subgraph_schemas[self.service_name.as_ref()];
74        self.operation.init_parsed(&schema.schema)?;
75        Ok(())
76    }
77}