use crate::{E2eError, Result};
use axum::body::Bytes;
use axum::extract::State;
use axum::http::StatusCode;
use axum::routing::post;
use axum::Router;
use serde::{Deserialize, Serialize};
use std::net::TcpListener;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use tokio::sync::oneshot;
use tracing::info;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CapturedHandlerRequest {
pub body: String,
pub endpoint: String,
}
#[derive(Clone)]
struct HandlerState {
requests: Arc<Mutex<Vec<CapturedHandlerRequest>>>,
capture_file: Option<PathBuf>,
}
impl HandlerState {
fn new(capture_file: Option<PathBuf>) -> Self {
Self {
requests: Arc::new(Mutex::new(Vec::new())),
capture_file,
}
}
fn add_request(&self, endpoint: &str, body: String) {
let request = CapturedHandlerRequest {
body: body.clone(),
endpoint: endpoint.to_string(),
};
let mut requests = self.requests.lock().unwrap();
requests.push(request.clone());
if let Some(ref path) = self.capture_file {
if let Ok(mut file) = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
{
use std::io::Write;
let _ = writeln!(
file,
"{}",
serde_json::to_string(&request).unwrap_or_default()
);
}
}
}
fn get_requests(&self) -> Vec<CapturedHandlerRequest> {
let requests = self.requests.lock().unwrap();
requests.clone()
}
fn request_count(&self) -> usize {
let requests = self.requests.lock().unwrap();
requests.len()
}
}
pub struct ExternalHandlerResource {
pub url: String,
pub port: u16,
state: HandlerState,
pub capture_file: Option<PathBuf>,
#[allow(dead_code)]
shutdown_tx: Option<oneshot::Sender<()>>,
}
impl ExternalHandlerResource {
pub async fn new() -> Result<Self> {
Self::with_capture_file(None).await
}
pub async fn with_capture_file(capture_file: Option<PathBuf>) -> Result<Self> {
let listener = TcpListener::bind("127.0.0.1:0").map_err(E2eError::Io)?;
let port = listener.local_addr().map_err(E2eError::Io)?.port();
drop(listener);
let state = HandlerState::new(capture_file.clone());
let state_clone = state.clone();
let app = Router::new()
.route("/handler_slim", post(handle_slim_single))
.route("/handler_slim_envelope", post(handle_slim_single_envelope))
.route("/handler_slim_batch", post(handle_slim_batch))
.route(
"/handler_slim_batch_envelope",
post(handle_slim_batch_envelope),
)
.route("/handler_passthrough", post(handle_passthrough))
.with_state(state_clone);
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let addr = format!("127.0.0.1:{}", port);
let listener = tokio::net::TcpListener::bind(&addr)
.await
.map_err(E2eError::Io)?;
info!("Starting external handler server at: {}", addr);
tokio::spawn(async move {
axum::serve(listener, app)
.with_graceful_shutdown(async move {
let _ = shutdown_rx.await;
})
.await
.ok();
});
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Ok(Self {
url: format!("http://127.0.0.1:{}", port),
port,
state,
capture_file,
shutdown_tx: Some(shutdown_tx),
})
}
pub fn slim_handler_url(&self) -> String {
format!("{}/handler_slim", self.url)
}
pub fn slim_handler_envelope_url(&self) -> String {
format!("{}/handler_slim_envelope", self.url)
}
pub fn slim_batch_handler_url(&self) -> String {
format!("{}/handler_slim_batch", self.url)
}
pub fn slim_batch_handler_envelope_url(&self) -> String {
format!("{}/handler_slim_batch_envelope", self.url)
}
pub fn passthrough_handler_url(&self) -> String {
format!("{}/handler_passthrough", self.url)
}
pub fn request_count(&self) -> usize {
self.state.request_count()
}
pub fn get_requests(&self) -> Vec<CapturedHandlerRequest> {
self.state.get_requests()
}
pub fn read_captured_file(&self) -> Result<Vec<CapturedHandlerRequest>> {
match &self.capture_file {
Some(path) => {
let content = std::fs::read_to_string(path).map_err(E2eError::Io)?;
let requests: Vec<CapturedHandlerRequest> = content
.lines()
.filter_map(|line| serde_json::from_str(line).ok())
.collect();
Ok(requests)
}
None => Ok(Vec::new()),
}
}
}
#[derive(Debug, Deserialize, Serialize)]
struct EnvelopeV0 {
id: String,
data: String,
#[serde(rename = "_gs_op")]
gs_op: String,
}
#[derive(Debug, Deserialize, Serialize)]
struct EnvelopeMetadata {
op: String,
}
#[derive(Debug, Deserialize, Serialize)]
struct EnvelopeV1Data {
id: String,
data: String,
#[serde(rename = "_gs_op")]
gs_op: String,
}
#[derive(Debug, Deserialize, Serialize)]
struct EnvelopeV1 {
metadata: EnvelopeMetadata,
data: EnvelopeV1Data,
}
async fn handle_slim_single(
State(state): State<HandlerState>,
body: Bytes,
) -> (StatusCode, String) {
let body_str = String::from_utf8_lossy(&body).to_string();
state.add_request("handler_slim", body_str.clone());
match serde_json::from_slice::<EnvelopeV0>(&body) {
Ok(mut req) => {
req.data = format!("updated-{}", req.data);
let response = serde_json::to_string(&req).unwrap_or_default();
(StatusCode::OK, response)
}
Err(e) => {
tracing::error!("Failed to parse request: {} - body: {}", e, body_str);
(StatusCode::BAD_REQUEST, format!("Invalid JSON: {}", e))
}
}
}
async fn handle_slim_single_envelope(
State(state): State<HandlerState>,
body: Bytes,
) -> (StatusCode, String) {
let body_str = String::from_utf8_lossy(&body).to_string();
state.add_request("handler_slim_envelope", body_str.clone());
match serde_json::from_slice::<EnvelopeV1>(&body) {
Ok(mut req) => {
req.data.data = format!("updated-{}", req.data.data);
let response = serde_json::to_string(&req).unwrap_or_default();
(StatusCode::OK, response)
}
Err(e) => {
tracing::error!(
"Failed to parse envelope request: {} - body: {}",
e,
body_str
);
(StatusCode::BAD_REQUEST, format!("Invalid JSON: {}", e))
}
}
}
async fn handle_slim_batch(State(state): State<HandlerState>, body: Bytes) -> (StatusCode, String) {
let body_str = String::from_utf8_lossy(&body).to_string();
state.add_request("handler_slim_batch", body_str.clone());
match serde_json::from_slice::<Vec<EnvelopeV0>>(&body) {
Ok(rows) => {
let updated: Vec<EnvelopeV0> = rows
.into_iter()
.map(|mut r| {
r.data = format!("updated-{}", r.data);
r
})
.collect();
let response = serde_json::to_string(&updated).unwrap_or_default();
(StatusCode::OK, response)
}
Err(e) => {
tracing::error!("Failed to parse batch request: {} - body: {}", e, body_str);
(StatusCode::BAD_REQUEST, format!("Invalid JSON: {}", e))
}
}
}
async fn handle_slim_batch_envelope(
State(state): State<HandlerState>,
body: Bytes,
) -> (StatusCode, String) {
let body_str = String::from_utf8_lossy(&body).to_string();
state.add_request("handler_slim_batch_envelope", body_str.clone());
match serde_json::from_slice::<Vec<EnvelopeV1>>(&body) {
Ok(rows) => {
let updated: Vec<EnvelopeV1> = rows
.into_iter()
.map(|mut r| {
r.data.data = format!("updated-{}", r.data.data);
r
})
.collect();
let response = serde_json::to_string(&updated).unwrap_or_default();
(StatusCode::OK, response)
}
Err(e) => {
tracing::error!(
"Failed to parse batch envelope request: {} - body: {}",
e,
body_str
);
(StatusCode::BAD_REQUEST, format!("Invalid JSON: {}", e))
}
}
}
async fn handle_passthrough(
State(state): State<HandlerState>,
body: Bytes,
) -> (StatusCode, String) {
let body_str = String::from_utf8_lossy(&body).to_string();
state.add_request("handler_passthrough", body_str.clone());
(StatusCode::OK, body_str)
}