use std::sync::Arc;
use apollo_federation::connectors::Connector;
use indexmap::IndexMap;
use crate::Context;
use crate::query_planner::PlanNode;
type ConnectorsByServiceName = Arc<IndexMap<Arc<str>, Connector>>;
pub(crate) fn store_connectors(
context: &Context,
connectors_by_service_name: Arc<IndexMap<Arc<str>, Connector>>,
) {
context
.extensions()
.with_lock(|lock| lock.insert::<ConnectorsByServiceName>(connectors_by_service_name));
}
pub(crate) fn get_connectors(context: &Context) -> Option<ConnectorsByServiceName> {
context
.extensions()
.with_lock(|lock| lock.get::<ConnectorsByServiceName>().cloned())
}
type ConnectorLabels = Arc<IndexMap<Arc<str>, String>>;
pub(crate) fn store_connectors_labels(context: &Context, labels_by_service_name: ConnectorLabels) {
context
.extensions()
.with_lock(|lock| lock.insert(labels_by_service_name));
}
pub(crate) fn replace_connector_service_names_text(
text: Option<Arc<String>>,
context: &Context,
) -> Option<Arc<String>> {
let replacements = context
.extensions()
.with_lock(|lock| lock.get::<ConnectorLabels>().cloned());
if let Some(replacements) = replacements {
text.as_ref().map(|text| {
let mut text = text.to_string();
for (service_name, label) in replacements.iter() {
text = text.replace(&**service_name, label.as_ref());
}
Arc::new(text)
})
} else {
text
}
}
pub(crate) fn replace_connector_service_names(
plan: Arc<PlanNode>,
context: &Context,
) -> Arc<PlanNode> {
let replacements = context
.extensions()
.with_lock(|lock| lock.get::<ConnectorLabels>().cloned());
return if let Some(replacements) = replacements {
let mut plan = plan.clone();
recurse(Arc::make_mut(&mut plan), &replacements);
plan
} else {
plan
};
fn recurse(plan: &mut PlanNode, replacements: &IndexMap<Arc<str>, String>) {
match plan {
PlanNode::Sequence { nodes } => {
for node in nodes {
recurse(node, replacements);
}
}
PlanNode::Parallel { nodes } => {
for node in nodes {
recurse(node, replacements);
}
}
PlanNode::Fetch(node) => {
if let Some(service_name) = replacements.get(&node.service_name) {
node.service_name = service_name.clone().into();
}
}
PlanNode::Flatten(flatten) => {
recurse(&mut flatten.node, replacements);
}
PlanNode::Defer { primary, deferred } => {
if let Some(primary) = primary.node.as_mut() {
recurse(primary, replacements);
}
for deferred in deferred {
if let Some(node) = &mut deferred.node {
recurse(Arc::make_mut(node), replacements);
}
}
}
PlanNode::Subscription { primary: _, rest } => {
if let Some(node) = rest {
recurse(node, replacements);
}
}
PlanNode::Condition {
if_clause,
else_clause,
..
} => {
if let Some(if_clause) = if_clause.as_mut() {
recurse(if_clause, replacements);
}
if let Some(else_clause) = else_clause.as_mut() {
recurse(else_clause, replacements);
}
}
}
}
}