use std::sync::Arc;
use async_compression::tokio::write::GzipEncoder;
use axum::body::Body;
use bytes::Bytes;
use futures::StreamExt;
use http::Response;
use http::StatusCode;
use http::header;
use tokio::io::AsyncWriteExt;
use tokio_util::io::ReaderStream;
use tower::BoxError;
use super::Config;
use super::DiagnosticsError;
use super::DiagnosticsResult;
use super::archive_utils::ArchiveUtils;
use super::memory;
#[cfg(test)]
mod tests;
#[derive(Debug, Clone)]
pub(super) struct Exporter {
config: Config,
supergraph_schema: Arc<String>,
router_config: Arc<str>,
}
impl Exporter {
pub(super) fn new(
config: Config,
supergraph_schema: Arc<String>,
router_config: Arc<str>,
) -> Self {
Self {
config,
supergraph_schema,
router_config,
}
}
pub(super) async fn export(self) -> Result<Response<Body>, DiagnosticsError> {
tracing::info!("Diagnostic export requested");
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| {
DiagnosticsError::Internal(format!("Failed to get current timestamp: {}", e))
})?
.as_secs();
let filename = format!("router-diagnostics-{}.tar.gz", timestamp);
let data_stream =
Self::create_streaming_archive(self.config, self.supergraph_schema, self.router_config)
.await;
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/gzip")
.header(header::TRANSFER_ENCODING, "chunked")
.header(
header::CONTENT_DISPOSITION,
format!("attachment; filename=\"{}\"", filename),
)
.body(Body::from_stream(data_stream))
.map_err(DiagnosticsError::Http)
}
async fn create_streaming_archive(
config: Config,
supergraph_schema: Arc<String>,
router_config: Arc<str>,
) -> impl futures::Stream<Item = Result<Bytes, BoxError>> + Send + 'static {
let (reader, writer) = tokio::io::simplex(2 * 1024 * 1024);
tokio::task::spawn(async move {
if let Err(e) = Self::create_streaming_archive_async(
&config,
&supergraph_schema,
&router_config,
writer,
)
.await
{
tracing::error!("Failed to create streaming archive: {}", e);
}
});
ReaderStream::new(reader).map(|result| result.map_err(|e| Box::new(e) as BoxError))
}
async fn create_streaming_archive_async<
W: tokio::io::AsyncWrite + Unpin + Send + Sync + 'static,
>(
config: &Config,
supergraph_schema: &str,
router_config: &str,
writer: W,
) -> DiagnosticsResult<()> {
let encoder = GzipEncoder::new(writer);
let mut tar = tokio_tar::Builder::new(encoder);
Self::add_manifest_to_archive(&mut tar, config).await?;
Self::add_router_config_to_archive(&mut tar, router_config).await?;
Self::add_supergraph_schema_to_archive(&mut tar, supergraph_schema).await?;
Self::add_system_info_to_archive(&mut tar).await?;
Self::add_memory_data_to_archive(&mut tar, config).await?;
Self::add_html_report_to_archive(&mut tar, config, router_config, supergraph_schema)
.await?;
let mut encoder = tar
.into_inner()
.await
.map_err(|e| DiagnosticsError::Internal(format!("Failed to finalize tar: {}", e)))?;
encoder
.shutdown()
.await
.map_err(|e| DiagnosticsError::Internal(format!("Failed to finalize gzip: {}", e)))?;
Ok(())
}
async fn add_manifest_to_archive<W: tokio::io::AsyncWrite + Unpin + Send + Sync>(
tar: &mut tokio_tar::Builder<W>,
config: &Config,
) -> DiagnosticsResult<()> {
let manifest = Self::create_main_manifest(config)?;
ArchiveUtils::add_text_file(tar, "manifest.txt", &manifest).await
}
async fn add_router_config_to_archive<W: tokio::io::AsyncWrite + Unpin + Send + Sync>(
tar: &mut tokio_tar::Builder<W>,
router_config: &str,
) -> DiagnosticsResult<()> {
ArchiveUtils::add_text_file(tar, "router.yaml", router_config).await
}
async fn add_supergraph_schema_to_archive<W: tokio::io::AsyncWrite + Unpin + Send + Sync>(
tar: &mut tokio_tar::Builder<W>,
supergraph_schema: &str,
) -> DiagnosticsResult<()> {
ArchiveUtils::add_text_file(tar, "supergraph.graphql", supergraph_schema).await
}
async fn add_system_info_to_archive<W: tokio::io::AsyncWrite + Unpin + Send + Sync>(
tar: &mut tokio_tar::Builder<W>,
) -> DiagnosticsResult<()> {
let system_info = crate::plugins::diagnostics::system_info::collect().await?;
ArchiveUtils::add_text_file(tar, "system_info.txt", &system_info).await
}
async fn add_memory_data_to_archive<W: tokio::io::AsyncWrite + Unpin + Send + Sync>(
tar: &mut tokio_tar::Builder<W>,
config: &Config,
) -> DiagnosticsResult<()> {
memory::MemoryService::add_to_archive(tar, &config.output_directory).await
}
async fn add_html_report_to_archive<W: tokio::io::AsyncWrite + Unpin + Send + Sync>(
tar: &mut tokio_tar::Builder<W>,
config: &Config,
router_config: &str,
supergraph_schema: &str,
) -> DiagnosticsResult<()> {
use std::path::Path;
use crate::plugins::diagnostics::html_generator::HtmlGenerator;
use crate::plugins::diagnostics::html_generator::ReportData;
let generator = HtmlGenerator::new()?;
let system_info_content = crate::plugins::diagnostics::system_info::collect().await?;
let memory_directory = Path::new(&config.output_directory).join("memory");
let memory_dumps = memory::load_memory_dumps(&memory_directory).await?;
let report_data = ReportData::new(
Some(&system_info_content),
Some(router_config),
Some(supergraph_schema),
&memory_dumps,
);
let html_content = generator.generate_embedded_html(report_data)?;
ArchiveUtils::add_text_file(tar, "diagnostics_report.html", &html_content).await
}
fn create_main_manifest(config: &Config) -> DiagnosticsResult<String> {
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| {
DiagnosticsError::Internal(format!("Failed to get current timestamp: {}", e))
})?
.as_secs();
let memory_profiling_info = if cfg!(target_family = "unix") {
"Memory Profiling: Enabled (jemalloc profiling available)"
} else {
"Memory Profiling: Not available - Heap dumps require Linux platform with jemalloc"
};
let manifest = format!(
"APOLLO ROUTER DIAGNOSTIC ARCHIVE\n\
Generated: {}\n\
Router Version: {}\n\
Platform: {}\n\
Memory Output Directory: {}\n\
{}\n\
\n\
Contents: manifest.txt, router.yaml, supergraph.graphql, system_info.txt, memory/, diagnostics_report.html\n",
timestamp,
env!("CARGO_PKG_VERSION"),
std::env::consts::OS,
config.output_directory.to_string_lossy(),
memory_profiling_info
);
Ok(manifest)
}
#[cfg(test)]
pub(super) fn config(&self) -> &Config {
&self.config
}
}