mod metrics;
use std::future::Future;
use std::ops::ControlFlow;
use std::sync::Arc;
use std::sync::OnceLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Instant;
use opentelemetry::metrics::MeterProvider as _;
use opentelemetry::metrics::ObservableGauge;
use tokio::sync::oneshot;
use tokio::task_local;
use tracing::Instrument;
use tracing::Span;
use tracing::info_span;
use tracing_core::Dispatch;
use tracing_subscriber::util::SubscriberInitExt;
use self::metrics::JobWatcher;
use self::metrics::Outcome;
use self::metrics::observe_compute_duration;
use self::metrics::observe_queue_wait_duration;
use crate::ageing_priority_queue::AgeingPriorityQueue;
use crate::ageing_priority_queue::Priority;
use crate::ageing_priority_queue::SendError;
use crate::metrics::meter_provider;
use crate::plugins::telemetry::consts::COMPUTE_JOB_EXECUTION_SPAN_NAME;
use crate::plugins::telemetry::consts::COMPUTE_JOB_SPAN_NAME;
const QUEUE_SOFT_CAPACITY_PER_THREAD: usize = 1_000;
fn thread_pool_size() -> usize {
if let Some(threads) = std::env::var("APOLLO_ROUTER_COMPUTE_THREADS")
.ok()
.and_then(|value| value.parse::<usize>().ok())
{
threads
} else {
std::thread::available_parallelism()
.expect("available_parallelism() failed")
.get()
}
}
pub(crate) struct JobStatus<'a, T> {
result_sender: &'a oneshot::Sender<std::thread::Result<T>>,
cancelled: Option<Arc<AtomicBool>>,
}
impl<T> JobStatus<'_, T> {
pub(crate) fn check_for_cooperative_cancellation(&self) -> ControlFlow<()> {
if self.result_sender.is_closed()
|| self
.cancelled
.as_ref()
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(false)
{
ControlFlow::Break(())
} else {
ControlFlow::Continue(())
}
}
}
#[derive(thiserror::Error, Debug, displaydoc::Display, Clone)]
pub(crate) struct ComputeBackPressureError;
#[derive(Debug)]
pub(crate) enum MaybeBackPressureError<E> {
PermanentError(E),
TemporaryError(ComputeBackPressureError),
}
impl<E> From<E> for MaybeBackPressureError<E> {
fn from(error: E) -> Self {
Self::PermanentError(error)
}
}
impl ComputeBackPressureError {
pub(crate) fn to_graphql_error(&self) -> crate::graphql::Error {
crate::graphql::Error::builder()
.message("Your request has been concurrency limited during query processing")
.extension_code("REQUEST_CONCURRENCY_LIMITED")
.build()
}
}
impl crate::graphql::IntoGraphQLErrors for ComputeBackPressureError {
fn into_graphql_errors(self) -> Result<Vec<crate::graphql::Error>, Self> {
Ok(vec![self.to_graphql_error()])
}
}
#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug, strum::IntoStaticStr)]
#[strum(serialize_all = "snake_case")]
pub(crate) enum ComputeJobType {
QueryParsing,
QueryPlanning,
Introspection,
QueryParsingWarmup,
QueryPlanningWarmup,
}
impl From<ComputeJobType> for Priority {
fn from(job_type: ComputeJobType) -> Self {
match job_type {
ComputeJobType::QueryPlanning => Self::P8, ComputeJobType::QueryParsing => Self::P4, ComputeJobType::Introspection => Self::P3, ComputeJobType::QueryParsingWarmup => Self::P1, ComputeJobType::QueryPlanningWarmup => Self::P2, }
}
}
impl From<ComputeJobType> for opentelemetry::Value {
fn from(compute_job_type: ComputeJobType) -> Self {
let s: &'static str = compute_job_type.into();
s.into()
}
}
pub(crate) struct Job {
subscriber: Dispatch,
parent_span: Span,
ty: ComputeJobType,
queue_start: Instant,
job_fn: Box<dyn FnOnce() + Send + 'static>,
allocation_stats: Option<std::sync::Arc<crate::allocator::AllocationStats>>,
}
pub(crate) fn queue() -> &'static AgeingPriorityQueue<Job> {
static QUEUE: OnceLock<AgeingPriorityQueue<Job>> = OnceLock::new();
QUEUE.get_or_init(|| {
let pool_size = thread_pool_size();
for _ in 0..pool_size {
std::thread::spawn(|| {
let queue = queue();
let mut receiver = queue.receiver();
loop {
let (job, age) = receiver.blocking_recv();
let job_type: &'static str = job.ty.into();
let age: &'static str = age.into();
let _subscriber = job.subscriber.set_default();
job.parent_span.in_scope(|| {
let span = info_span!(
COMPUTE_JOB_EXECUTION_SPAN_NAME,
"job.type" = job_type,
"job.age" = age
);
span.in_scope(|| {
observe_queue_wait_duration(job.ty, job.queue_start.elapsed());
let _active_metric = i64_up_down_counter_with_unit!(
"apollo.router.compute_jobs.active_jobs",
"Number of computation jobs in progress",
"{job}",
1,
job.type = job.ty
);
let job_start = Instant::now();
if let Some(stats) = job.allocation_stats {
let job_name: &'static str = job.ty.into();
crate::allocator::with_parented_memory_tracking(
job_name,
stats,
|| {
(job.job_fn)();
#[cfg(all(
feature = "global-allocator",
not(feature = "dhat-heap"),
unix
))]
if let Some(allocation_stats) = crate::allocator::current()
{
record_metrics(&allocation_stats);
}
},
);
} else {
(job.job_fn)();
}
observe_compute_duration(job.ty, job_start.elapsed());
})
});
}
});
}
tracing::info!(
threads = pool_size,
queue_capacity = QUEUE_SOFT_CAPACITY_PER_THREAD * pool_size,
"compute job thread pool created",
);
AgeingPriorityQueue::bounded(QUEUE_SOFT_CAPACITY_PER_THREAD * pool_size)
})
}
#[cfg(all(feature = "global-allocator", not(feature = "dhat-heap"), unix))]
fn record_metrics(stats: &crate::allocator::AllocationStats) {
let bytes_allocated = stats.bytes_allocated() as u64;
let bytes_deallocated = stats.bytes_deallocated() as u64;
let bytes_zeroed = stats.bytes_zeroed() as u64;
let bytes_reallocated = stats.bytes_reallocated() as u64;
let context_name = stats.name();
u64_histogram_with_unit!(
"apollo.router.query_planner.memory",
"Memory allocated during query planning",
"By",
bytes_allocated,
allocation.type = "allocated",
context = context_name
);
u64_histogram_with_unit!(
"apollo.router.query_planner.memory",
"Memory allocated during query planning",
"By",
bytes_deallocated,
allocation.type = "deallocated",
context = context_name
);
u64_histogram_with_unit!(
"apollo.router.query_planner.memory",
"Memory allocated during query planning",
"By",
bytes_zeroed,
allocation.type = "zeroed",
context = context_name
);
u64_histogram_with_unit!(
"apollo.router.query_planner.memory",
"Memory allocated during query planning",
"By",
bytes_reallocated,
allocation.type = "reallocated",
context = context_name
);
}
task_local! {
pub(crate) static CANCEL_JOB: Option<Arc<AtomicBool>>;
}
pub(crate) fn execute<T, F>(
compute_job_type: ComputeJobType,
job: F,
) -> Result<impl Future<Output = T>, ComputeBackPressureError>
where
F: FnOnce(JobStatus<'_, T>) -> T + Send + 'static,
T: Send + 'static,
{
let compute_job_type_str: &'static str = compute_job_type.into();
let span = info_span!(
COMPUTE_JOB_SPAN_NAME,
"job.type" = compute_job_type_str,
"job.outcome" = tracing::field::Empty
);
span.in_scope(|| {
let mut job_watcher = JobWatcher::new(compute_job_type);
let (tx, rx) = oneshot::channel();
let cancelled = CANCEL_JOB.try_with(|b| b.clone()).ok().flatten();
let wrapped_job_fn = Box::new(move || {
let status = JobStatus {
result_sender: &tx,
cancelled,
};
let result =
std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || job(status)));
match tx.send(result) {
Ok(()) => {}
Err(_) => {
}
}
});
let queue = queue();
let job = Job {
subscriber: Dispatch::default(),
parent_span: Span::current(),
ty: compute_job_type,
job_fn: wrapped_job_fn,
queue_start: Instant::now(),
allocation_stats: crate::allocator::current(),
};
queue
.send(Priority::from(compute_job_type), job)
.map_err(|e| match e {
SendError::QueueIsFull => {
u64_counter!(
"apollo.router.compute_jobs.queue_is_full",
"Number of requests rejected because the queue for compute jobs is full",
1u64
);
job_watcher.outcome = Outcome::RejectedQueueFull;
ComputeBackPressureError
}
SendError::Disconnected => {
let _proof_of_life: &'static AgeingPriorityQueue<_> = queue;
unreachable!("compute thread pool queue is disconnected")
}
})?;
Ok(async move {
let result = rx.await;
let mut local_job_watcher = job_watcher;
local_job_watcher.outcome = match &result {
Ok(Ok(_)) => Outcome::ExecutedOk,
Ok(Err(_)) => Outcome::ExecutedError,
Err(_) => Outcome::ChannelError,
};
match result {
Ok(Ok(value)) => value,
Ok(Err(panic_payload)) => {
std::panic::resume_unwind(panic_payload)
}
Err(e) => {
let _: tokio::sync::oneshot::error::RecvError = e;
unreachable!("compute result oneshot channel is disconnected")
}
}
}
.in_current_span())
})
}
pub(crate) fn create_queue_size_gauge() -> ObservableGauge<u64> {
meter_provider()
.meter("apollo/router")
.u64_observable_gauge("apollo.router.compute_jobs.queued")
.with_description(
"Number of computation jobs (parsing, planning, …) waiting to be scheduled",
)
.with_callback(move |m| m.observe(queue().queued_count() as u64, &[]))
.build()
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use std::time::Instant;
use tracing_futures::WithSubscriber;
use super::*;
use crate::assert_snapshot_subscriber;
async fn ensure_queue_is_initialized() {
execute(ComputeJobType::Introspection, |_| {})
.unwrap()
.await;
}
#[tokio::test]
async fn test_observability() {
ensure_queue_is_initialized().await;
async {
let span = info_span!("test_observability");
let job = span.in_scope(|| {
tracing::info!("Outer");
execute(ComputeJobType::QueryParsing, |_| {
tracing::info!("Inner");
1
})
.unwrap()
});
let result = job.await;
assert_eq!(result, 1);
}
.with_subscriber(assert_snapshot_subscriber!())
.await;
}
#[tokio::test]
async fn test_executes_on_different_thread() {
let test_thread = std::thread::current().id();
let job_thread = execute(ComputeJobType::QueryParsing, |_| {
std::thread::current().id()
})
.unwrap()
.await;
assert_ne!(job_thread, test_thread)
}
#[tokio::test]
async fn test_parallelism() {
if thread_pool_size() < 2 {
return;
}
let start = Instant::now();
let one = execute(ComputeJobType::QueryPlanning, |_| {
std::thread::sleep(Duration::from_millis(1_000));
1
})
.unwrap();
let two = execute(ComputeJobType::QueryPlanning, |_| {
std::thread::sleep(Duration::from_millis(1_000));
1 + 1
})
.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
assert_eq!(one.await, 1);
assert_eq!(two.await, 2);
assert!(start.elapsed() < Duration::from_millis(1_400));
}
#[tokio::test]
async fn test_cancel() {
let (side_channel_sender, side_channel_receiver) = oneshot::channel();
let queue_receiver = execute(ComputeJobType::Introspection, move |status| {
for _ in 0..1_000 {
std::thread::sleep(Duration::from_millis(10));
if status.check_for_cooperative_cancellation().is_break() {
side_channel_sender.send(Ok(())).unwrap();
return;
}
}
side_channel_sender.send(Err(())).unwrap();
});
drop(queue_receiver);
match side_channel_receiver.await {
Ok(Ok(())) => {}
e => panic!("job did not cancel as expected: {e:?}"),
};
}
#[tokio::test]
async fn test_cancel_with_task_local() {
let (side_channel_sender, side_channel_receiver) = oneshot::channel();
let queue_receiver = crate::compute_job::CANCEL_JOB.scope(
Some(Arc::new(AtomicBool::new(true))),
async move {
execute(ComputeJobType::Introspection, move |status| {
for _ in 0..1_000 {
std::thread::sleep(Duration::from_millis(10));
if status.check_for_cooperative_cancellation().is_break() {
side_channel_sender.send(Ok(())).unwrap();
return;
}
}
side_channel_sender.send(Err(())).unwrap();
})
},
);
match tokio::join!(side_channel_receiver, queue_receiver) {
(Ok(Ok(())), _) => {}
(e, _) => panic!("job did not cancel as expected: {e:?}"),
};
}
}