use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use futures::StreamExt;
use futures::stream::once;
use tokio::fs;
use tower::BoxError;
use tower::ServiceBuilder;
use tower::ServiceExt as TowerServiceExt;
use super::recording::Recording;
use super::recording::RequestDetails;
use super::recording::ResponseDetails;
use super::recording::Subgraph;
use crate::layers::ServiceBuilderExt;
use crate::plugin::Plugin;
use crate::plugin::PluginInit;
use crate::services::execution;
use crate::services::external::externalize_header_map;
use crate::services::router;
use crate::services::router::body::RouterBody;
use crate::services::subgraph;
use crate::services::supergraph;
const RECORD_HEADER: &str = "x-apollo-router-record";
#[derive(Debug, Clone, serde::Deserialize, schemars::JsonSchema)]
#[serde(deny_unknown_fields)]
struct RecordConfig {
enabled: bool,
storage_path: Option<PathBuf>,
}
fn default_storage_path() -> PathBuf {
std::env::current_dir().expect("failed to get current directory")
}
#[derive(Debug)]
struct Record {
enabled: bool,
supergraph_sdl: Arc<String>,
storage_path: Arc<Path>,
}
register_plugin!("experimental", "record", Record);
#[async_trait::async_trait]
impl Plugin for Record {
type Config = RecordConfig;
async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError> {
let storage_path = init
.config
.storage_path
.unwrap_or_else(default_storage_path);
let plugin = Self {
enabled: init.config.enabled,
supergraph_sdl: init.supergraph_sdl.clone(),
storage_path: storage_path.clone().into(),
};
if init.config.enabled {
write_file(
storage_path.into(),
&PathBuf::from("README.md"),
include_str!("recording-readme.md").as_bytes(),
)
.await?;
}
Ok(plugin)
}
fn router_service(&self, service: router::BoxService) -> router::BoxService {
if !self.enabled {
return service;
}
let dir = self.storage_path.clone();
ServiceBuilder::new()
.map_future(move |future| {
let dir = dir.clone();
async move {
let res: router::Response = future.await?;
let (parts, stream) = res.response.into_parts();
let headers = parts.headers.clone();
let context = res.context.clone();
let after_complete = once(async move {
let recording = context
.extensions()
.with_lock(|mut lock| lock.remove::<Recording>());
if let Some(mut recording) = recording {
let res_headers = externalize_header_map(&headers)?;
recording.client_response.headers = res_headers;
let filename = recording.filename();
let contents = serde_json::to_value(recording)?;
tokio::spawn(async move {
tracing::info!("Writing recording to {:?}", filename);
write_file(
dir,
&filename,
serde_json::to_string_pretty(&contents)?.as_bytes(),
)
.await?;
Ok::<(), BoxError>(())
})
.await??;
}
Ok::<Option<_>, BoxError>(None)
})
.filter_map(|a| async move { a.unwrap() });
let stream = stream.chain(after_complete);
Ok(router::Response {
context: res.context,
response: http::Response::from_parts(
parts,
RouterBody::wrap_stream(stream).into_inner(),
),
})
}
})
.service(service)
.boxed()
}
fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
if !self.enabled {
return service;
}
let supergraph_sdl = self.supergraph_sdl.clone();
ServiceBuilder::new()
.map_request(move |req: supergraph::Request| {
if is_introspection(&req) {
return req;
}
let recording_enabled =
if req.supergraph_request.headers().contains_key(RECORD_HEADER) {
req.context.extensions().with_lock(|mut lock| {
lock.insert(Recording {
supergraph_sdl: supergraph_sdl.clone().to_string(),
client_request: Default::default(),
client_response: Default::default(),
formatted_query_plan: Default::default(),
subgraph_fetches: Default::default(),
})
});
true
} else {
false
};
if recording_enabled {
let query = req.supergraph_request.body().query.clone();
let operation_name = req.supergraph_request.body().operation_name.clone();
let variables = req.supergraph_request.body().variables.clone();
let headers = externalize_header_map(req.supergraph_request.headers())
.expect("failed to externalize header map");
let method = req.supergraph_request.method().to_string();
let uri = req.supergraph_request.uri().to_string();
req.context.extensions().with_lock(|mut lock| {
if let Some(recording) = lock.get_mut::<Recording>() {
recording.client_request = RequestDetails {
query,
operation_name,
variables,
headers,
method,
uri,
};
}
});
}
req
})
.map_response(|res: supergraph::Response| {
let context = res.context.clone();
res.map_stream(move |chunk| {
context.extensions().with_lock(|mut lock| {
if let Some(recording) = lock.get_mut::<Recording>() {
recording.client_response.chunks.push(chunk.clone());
}
});
chunk
})
})
.service(service)
.boxed()
}
fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
ServiceBuilder::new()
.map_request(|req: execution::Request| {
req.context.extensions().with_lock(|mut lock| {
if let Some(recording) = lock.get_mut::<Recording>() {
recording.formatted_query_plan =
req.query_plan.formatted_query_plan.clone();
}
});
req
})
.service(service)
.boxed()
}
fn subgraph_service(
&self,
subgraph_name: &str,
service: subgraph::BoxService,
) -> subgraph::BoxService {
if !self.enabled {
return service;
}
let subgraph_name = String::from(subgraph_name);
ServiceBuilder::new()
.map_future_with_request_data(
|req: &subgraph::Request| RequestDetails {
query: req.subgraph_request.body().query.clone(),
operation_name: req.subgraph_request.body().operation_name.clone(),
variables: req.subgraph_request.body().variables.clone(),
headers: externalize_header_map(req.subgraph_request.headers())
.expect("failed to externalize header map"),
method: req.subgraph_request.method().to_string(),
uri: req.subgraph_request.uri().to_string(),
},
move |req: RequestDetails, future| {
let subgraph_name = subgraph_name.clone();
async move {
let res: subgraph::ServiceResult = future.await;
let operation_name = req
.operation_name
.clone()
.unwrap_or_else(|| "UnnamedOperation".to_string());
let res = match res {
Ok(res) => {
let subgraph = Subgraph {
subgraph_name,
response: ResponseDetails {
headers: externalize_header_map(
&res.response.headers().clone(),
)
.expect("failed to externalize header map"),
chunks: vec![res.response.body().clone()],
},
request: req,
};
res.context.extensions().with_lock(|mut lock| {
if let Some(recording) = lock.get_mut::<Recording>() {
if recording.subgraph_fetches.is_none() {
recording.subgraph_fetches = Some(Default::default());
}
if let Some(fetches) = &mut recording.subgraph_fetches {
fetches.insert(operation_name, subgraph);
}
}
});
Ok(res)
}
Err(err) => Err(err),
};
res
}
},
)
.service(service)
.boxed()
}
}
async fn write_file(dir: Arc<Path>, path: &PathBuf, contents: &[u8]) -> Result<(), BoxError> {
let path = dir.join(path);
let dir = path.parent().ok_or("invalid record directory")?;
fs::create_dir_all(dir).await?;
fs::write(path, contents).await?;
Ok(())
}
fn is_introspection(request: &supergraph::Request) -> bool {
request
.context
.unsupported_executable_document()
.is_some_and(|doc| {
doc.operations
.get(request.supergraph_request.body().operation_name.as_deref())
.ok()
.is_some_and(|op| {
op.root_fields(&doc).all(|field| {
matches!(field.name.as_str(), "__typename" | "__schema" | "__type")
})
})
})
}