apollo_router/query_planner/
subscription.rs1use std::sync::Arc;
2use std::sync::atomic::AtomicUsize;
3
4use apollo_federation::query_plan::serializable_document::SerializableDocument;
5use serde::Deserialize;
6use serde::Serialize;
7use tokio::sync::broadcast;
8
9use super::OperationKind;
10use super::fetch::SubgraphSchemas;
11use super::rewrites;
12use crate::error::ValidationErrors;
13use crate::plugins::subscription::SubscriptionTaskParams;
14
15pub(crate) const SUBSCRIPTION_EVENT_SPAN_NAME: &str = "subscription_event";
16pub(crate) static OPENED_SUBSCRIPTIONS: AtomicUsize = AtomicUsize::new(0);
17pub(crate) struct SubscriptionHandle {
18 pub(crate) closed_signal: broadcast::Receiver<()>,
19 pub(crate) subscription_conf_tx: Option<tokio::sync::mpsc::Sender<SubscriptionTaskParams>>,
20}
21
22impl Clone for SubscriptionHandle {
23 fn clone(&self) -> Self {
24 Self {
25 closed_signal: self.closed_signal.resubscribe(),
26 subscription_conf_tx: self.subscription_conf_tx.clone(),
27 }
28 }
29}
30
31impl SubscriptionHandle {
32 pub(crate) fn new(
33 closed_signal: broadcast::Receiver<()>,
34 subscription_conf_tx: Option<tokio::sync::mpsc::Sender<SubscriptionTaskParams>>,
35 ) -> Self {
36 Self {
37 closed_signal,
38 subscription_conf_tx,
39 }
40 }
41}
42
43#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
44#[serde(rename_all = "camelCase")]
45pub(crate) struct SubscriptionNode {
46 pub(crate) service_name: Arc<str>,
48
49 pub(crate) variable_usages: Vec<Arc<str>>,
51
52 pub(crate) operation: SerializableDocument,
54
55 pub(crate) operation_name: Option<Arc<str>>,
57
58 pub(crate) operation_kind: OperationKind,
60
61 pub(crate) input_rewrites: Option<Vec<rewrites::DataRewrite>>,
63
64 pub(crate) output_rewrites: Option<Vec<rewrites::DataRewrite>>,
66}
67
68impl SubscriptionNode {
69 pub(crate) fn init_parsed_operation(
70 &mut self,
71 subgraph_schemas: &SubgraphSchemas,
72 ) -> Result<(), ValidationErrors> {
73 let schema = &subgraph_schemas[self.service_name.as_ref()];
74 self.operation.init_parsed(&schema.schema)?;
75 Ok(())
76 }
77}