use std::{
collections::HashMap,
error::Error,
net::{IpAddr, SocketAddr, ToSocketAddrs},
sync::{Arc, Mutex, atomic::{AtomicU64, Ordering}},
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
use futures_util::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use reqwest::{
StatusCode,
dns::{Addrs as ReqwestAddrs, Name as ReqwestDnsName, Resolve as ReqwestResolve, Resolving},
header::{HeaderMap, HeaderName, HeaderValue},
};
use serde_json::Value;
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::mpsc,
time::timeout,
};
use tokio_tungstenite::{
connect_async,
tungstenite::{client::IntoClientRequest, protocol::Message},
WebSocketStream,
};
use crate::parser::{self, context};
use super::{
ArtifactTimingPhase, ArtifactTranscript, ArtifactTranscriptDirection,
AssertionOutcome, ExecutionArtifact, ExecutionObserver, ExecutionEvent, FormDataType,
GraphqlOperation, HttpOperation, McpCall, McpOperation, Request, RequestExecution,
RequestExecutionError, RequestOperation, RequestProtocol, ResponseSnapshot,
SessionRegistry, SseAction, SseOperation, WsAction, WsOperation,
parse_within_duration,
};
use super::session::{SseMessage, WsCommand, WsMessage, WsMessageKind};
use super::response_capture::{
INTERNAL_SSE_EVENT_HEADER, INTERNAL_SSE_ID_HEADER, INTERNAL_WS_KIND_HEADER,
resolve_redaction_body_value,
};
static MULTIPART_BOUNDARY_COUNTER: AtomicU64 = AtomicU64::new(0);
static MCP_JSON_RPC_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
const DEFAULT_MCP_PROTOCOL_VERSION: &str = "2025-11-25";
const DEFAULT_MCP_CLIENT_NAME: &str = "hen";
#[derive(Debug, Default)]
struct DnsTimingRecorder {
durations: Mutex<Vec<Duration>>,
}
impl DnsTimingRecorder {
fn record(&self, duration: Duration) {
if let Ok(mut durations) = self.durations.lock() {
durations.push(duration);
}
}
fn phases(&self) -> Vec<ArtifactTimingPhase> {
self.durations
.lock()
.map(|durations| {
durations
.iter()
.copied()
.map(|duration| ArtifactTimingPhase::new("dns", duration))
.collect()
})
.unwrap_or_default()
}
}
#[derive(Clone)]
struct TimingDnsResolver {
recorder: Arc<DnsTimingRecorder>,
}
impl TimingDnsResolver {
fn new(recorder: Arc<DnsTimingRecorder>) -> Self {
Self { recorder }
}
}
impl ReqwestResolve for TimingDnsResolver {
fn resolve(&self, name: ReqwestDnsName) -> Resolving {
let recorder = self.recorder.clone();
let host = name.as_str().to_string();
Box::pin(async move {
if let Ok(address) = host.parse::<IpAddr>() {
let addrs: ReqwestAddrs = Box::new(std::iter::once(SocketAddr::new(address, 0)));
return Ok(addrs);
}
let lookup_host = host.clone();
let started_at = Instant::now();
let addrs = tokio::task::spawn_blocking(move || -> Result<Vec<SocketAddr>, std::io::Error> {
(lookup_host.as_str(), 0)
.to_socket_addrs()
.map(|addrs| addrs.collect::<Vec<_>>())
})
.await
.map_err(|err| -> Box<dyn Error + Send + Sync> { Box::new(err) })?
.map_err(|err| -> Box<dyn Error + Send + Sync> { Box::new(err) })?;
recorder.record(started_at.elapsed());
let addrs: ReqwestAddrs = Box::new(addrs.into_iter());
Ok(addrs)
})
}
}
fn build_timed_reqwest_client() -> Result<(reqwest::Client, Arc<DnsTimingRecorder>), RequestExecutionError> {
let recorder = Arc::new(DnsTimingRecorder::default());
let resolver = Arc::new(TimingDnsResolver::new(recorder.clone()));
let client = reqwest::Client::builder()
.dns_resolver(resolver)
.build()
.map_err(|err| RequestExecutionError::new(err.to_string()))?;
Ok((client, recorder))
}
pub(crate) async fn execute_request(
request: &Request,
inherited_context: &HashMap<String, String>,
inherited_sensitive_values: &[String],
dependency_artifacts: &HashMap<String, ExecutionArtifact>,
observer: Option<&ExecutionObserver>,
sessions: &SessionRegistry,
) -> Result<RequestExecution, RequestExecutionError> {
let mut context_map = inherited_context.clone();
for (key, value) in &request.context {
context_map.insert(key.clone(), value.clone());
}
match &request.operation {
RequestOperation::Http(operation) => {
execute_http_request(
request,
operation,
RequestProtocol::Http,
"http",
HashMap::new(),
HashMap::new(),
&context_map,
inherited_sensitive_values,
dependency_artifacts,
observer,
sessions,
)
.await
}
RequestOperation::Graphql(operation) => {
let graphql_request = resolve_graphql_request(operation, &context_map)
.map_err(|err| RequestExecutionError::new(err.to_string()))?;
let mut request_attributes = HashMap::new();
if let Some(operation_name) = &graphql_request.operation_name {
request_attributes.insert("operationName".to_string(), operation_name.clone());
}
if let Some(variables_json) = &graphql_request.variables_json {
request_attributes.insert("variables".to_string(), variables_json.clone());
}
let mut export_overrides = HashMap::new();
export_overrides.insert(
"GRAPHQL_DOCUMENT".to_string(),
graphql_request.document.clone(),
);
if let Some(operation_name) = &graphql_request.operation_name {
export_overrides.insert("GRAPHQL_OPERATION".to_string(), operation_name.clone());
}
if let Some(variables_json) = &graphql_request.variables_json {
export_overrides.insert("GRAPHQL_VARIABLES".to_string(), variables_json.clone());
}
execute_http_request(
request,
&graphql_request.http_operation,
RequestProtocol::Graphql,
"graphql",
request_attributes,
export_overrides,
&context_map,
inherited_sensitive_values,
dependency_artifacts,
observer,
sessions,
)
.await
}
RequestOperation::Mcp(operation) => {
execute_mcp_request(
request,
operation,
&context_map,
inherited_sensitive_values,
dependency_artifacts,
observer,
sessions,
)
.await
}
RequestOperation::Sse(operation) => {
execute_sse_request(
request,
operation,
&context_map,
inherited_sensitive_values,
dependency_artifacts,
observer,
sessions,
)
.await
}
RequestOperation::Ws(operation) => {
execute_ws_request(
request,
operation,
&context_map,
inherited_sensitive_values,
dependency_artifacts,
observer,
sessions,
)
.await
}
}
}
pub(crate) fn render_curl(request: &Request) -> String {
match &request.operation {
RequestOperation::Http(operation) => render_http_curl(operation),
RequestOperation::Graphql(operation) => match materialize_graphql_http_operation(
&operation.http,
operation.operation_name.clone(),
operation.document.clone(),
operation.variables_json.clone(),
) {
Ok(http_operation) => render_http_curl(&http_operation),
Err(err) => format!("# invalid graphql request: {}", err),
},
RequestOperation::Mcp(operation) => {
match materialize_mcp_http_operation(
&operation.http,
&resolved_mcp_call_for_render(&operation.call),
1,
) {
Ok(http_operation) => render_http_curl(&http_operation),
Err(err) => format!("# invalid mcp request: {}", err),
}
}
RequestOperation::Sse(operation) => match &operation.action {
SseAction::Open => render_http_curl(&materialize_sse_open_http_operation(&operation.http)),
SseAction::Receive { .. } => {
"# sse receive step has no standalone curl equivalent".to_string()
}
},
RequestOperation::Ws(_) => {
"# websocket steps do not have a standalone curl equivalent".to_string()
}
}
}
async fn execute_mcp_request(
request: &Request,
operation: &McpOperation,
context_map: &HashMap<String, String>,
inherited_sensitive_values: &[String],
dependency_artifacts: &HashMap<String, ExecutionArtifact>,
observer: Option<&ExecutionObserver>,
sessions: &SessionRegistry,
) -> Result<RequestExecution, RequestExecutionError> {
let resolved = resolve_mcp_request(request, operation, context_map, sessions)
.map_err(|err| RequestExecutionError::new(err.to_string()))?;
let execution = execute_http_request(
request,
&resolved.http_operation,
RequestProtocol::Mcp,
"mcp",
resolved.request_attributes,
resolved.export_overrides,
context_map,
inherited_sensitive_values,
dependency_artifacts,
observer,
sessions,
)
.await?;
if matches!(resolved.call, ResolvedMcpCall::Initialize(_)) {
send_mcp_initialized_notification(&resolved.http_operation, context_map)
.await
.map_err(|err| RequestExecutionError::new(err.to_string()))?;
if let Some(session_name) = &request.session_name {
let mut metadata = HashMap::new();
metadata.insert("url".to_string(), resolved.http_operation.url.clone());
metadata.insert(
"headersJson".to_string(),
serde_json::to_string(&resolved.http_operation.headers)
.expect("mcp session headers should serialize"),
);
if let Some(result) = execution
.artifact
.json
.as_ref()
.and_then(|json| json.get("result"))
{
if let Some(protocol_version) = result.get("protocolVersion").and_then(|v| v.as_str()) {
metadata.insert("protocolVersion".to_string(), protocol_version.to_string());
}
if let Some(server_name) = result
.get("serverInfo")
.and_then(|value| value.get("name"))
.and_then(|value| value.as_str())
{
metadata.insert("serverName".to_string(), server_name.to_string());
}
if let Some(server_version) = result
.get("serverInfo")
.and_then(|value| value.get("version"))
.and_then(|value| value.as_str())
{
metadata.insert("serverVersion".to_string(), server_version.to_string());
}
}
sessions.upsert(session_name.clone(), RequestProtocol::Mcp, metadata);
}
}
Ok(execution)
}
async fn execute_sse_request(
request: &Request,
operation: &SseOperation,
context_map: &HashMap<String, String>,
inherited_sensitive_values: &[String],
dependency_artifacts: &HashMap<String, ExecutionArtifact>,
observer: Option<&ExecutionObserver>,
sessions: &SessionRegistry,
) -> Result<RequestExecution, RequestExecutionError> {
match &operation.action {
SseAction::Open => {
execute_sse_open(
request,
operation,
context_map,
inherited_sensitive_values,
dependency_artifacts,
observer,
sessions,
)
.await
}
SseAction::Receive { within } => {
execute_sse_receive(
request,
operation,
within.as_str(),
context_map,
inherited_sensitive_values,
dependency_artifacts,
observer,
sessions,
)
.await
}
}
}
async fn execute_ws_request(
request: &Request,
operation: &WsOperation,
context_map: &HashMap<String, String>,
inherited_sensitive_values: &[String],
dependency_artifacts: &HashMap<String, ExecutionArtifact>,
observer: Option<&ExecutionObserver>,
sessions: &SessionRegistry,
) -> Result<RequestExecution, RequestExecutionError> {
match &operation.action {
WsAction::Open => {
execute_ws_open(
request,
operation,
context_map,
inherited_sensitive_values,
dependency_artifacts,
observer,
sessions,
)
.await
}
WsAction::Send { kind, payload } => {
execute_ws_send(
request,
operation,
kind.as_str(),
payload.as_str(),
context_map,
inherited_sensitive_values,
dependency_artifacts,
observer,
sessions,
)
.await
}
WsAction::Exchange {
kind,
payload,
within,
} => {
execute_ws_exchange(
request,
operation,
kind.as_str(),
payload.as_str(),
within.as_str(),
context_map,
inherited_sensitive_values,
dependency_artifacts,
observer,
sessions,
)
.await
}
WsAction::Receive { within } => {
execute_ws_receive(
request,
operation,
within.as_str(),
context_map,
inherited_sensitive_values,
dependency_artifacts,
observer,
sessions,
)
.await
}
}
}
async fn execute_http_request(
request: &Request,
operation: &HttpOperation,
protocol: RequestProtocol,
transcript_label_prefix: &str,
extra_request_attributes: HashMap<String, String>,
extra_exports: HashMap<String, String>,
context_map: &HashMap<String, String>,
inherited_sensitive_values: &[String],
dependency_artifacts: &HashMap<String, ExecutionArtifact>,
observer: Option<&ExecutionObserver>,
_sessions: &SessionRegistry,
) -> Result<RequestExecution, RequestExecutionError> {
let (client, dns_timing) = build_timed_reqwest_client()?;
let built_request = build_reqwest_request(operation, &client, context_map)
.map_err(|err| RequestExecutionError::new(err.to_string()))?;
let mut request_attributes = HashMap::new();
request_attributes.insert("url".to_string(), built_request.url().to_string());
request_attributes.insert("method".to_string(), built_request.method().as_str().to_string());
request_attributes.extend(extra_request_attributes);
let request_body = built_request
.body()
.and_then(|body| body.as_bytes())
.map(|body| String::from_utf8_lossy(body).into_owned())
.unwrap_or_default();
log::debug!("REQUEST\n{:#?}", built_request);
let response_start_timer = Instant::now();
let resp = client
.execute(built_request)
.await
.map_err(|err| RequestExecutionError::new(err.to_string()))?;
let response_start_duration = response_start_timer.elapsed();
log::debug!("RESPONSE\n{:#?}", resp);
let status = resp.status();
let headers = resp.headers().clone();
let body_read_timer = Instant::now();
let raw_response_text = String::from_utf8_lossy(
&resp
.bytes()
.await
.map_err(|err| RequestExecutionError::new(err.to_string()))?,
)
.into_owned();
let body_read_duration = body_read_timer.elapsed();
let sanitized_response = raw_response_text.replace('\0', "");
let expected_mcp_response_id = if matches!(protocol, RequestProtocol::Mcp) {
extract_json_rpc_id(&request_body)
} else {
None
};
let normalized_response = normalize_http_response_body(
protocol,
&headers,
sanitized_response.as_str(),
expected_mcp_response_id.as_ref(),
);
let mut response_attributes = HashMap::new();
response_attributes.insert("status".to_string(), status.as_u16().to_string());
let mut artifact = ExecutionArtifact::new_http(
status,
headers.clone(),
normalized_response.body.clone(),
normalized_response.json.clone(),
vec![
ArtifactTranscript {
direction: ArtifactTranscriptDirection::Outgoing,
label: format!("{}.request", transcript_label_prefix),
body: request_body,
attributes: request_attributes,
},
ArtifactTranscript {
direction: ArtifactTranscriptDirection::Incoming,
label: format!("{}.response", transcript_label_prefix),
body: sanitized_response.clone(),
attributes: response_attributes,
},
],
);
artifact.protocol = protocol;
artifact.timing_phases = dns_timing.phases();
artifact.timing_phases.extend([
ArtifactTimingPhase::new("responseStart", response_start_duration),
ArtifactTimingPhase::new("bodyRead", body_read_duration),
]);
finalize_execution(
request,
artifact,
context_map,
inherited_sensitive_values,
dependency_artifacts,
observer,
extra_exports,
)
}
fn finalize_execution(
request: &Request,
artifact: ExecutionArtifact,
context_map: &HashMap<String, String>,
inherited_sensitive_values: &[String],
dependency_artifacts: &HashMap<String, ExecutionArtifact>,
observer: Option<&ExecutionObserver>,
extra_exports: HashMap<String, String>,
) -> Result<RequestExecution, RequestExecutionError> {
let current_snapshot = ResponseSnapshot::from(&artifact);
let dependency_snapshots: HashMap<String, ResponseSnapshot> = dependency_artifacts
.iter()
.map(|(name, artifact)| (name.clone(), ResponseSnapshot::from(artifact)))
.collect();
let mut captured_values: HashMap<String, String> = HashMap::new();
let mut capture_context = context_map.clone();
for capture in &request.response_captures {
let source_artifact = match &capture.source {
super::response_capture::CaptureSource::Current => ¤t_snapshot,
super::response_capture::CaptureSource::Dependency(name) => dependency_snapshots
.get(name)
.ok_or_else(|| {
RequestExecutionError::with_artifact(
super::CaptureDependencyError {
dependency: name.clone(),
request: request.description.clone(),
}
.to_string(),
artifact.clone(),
)
.with_sensitive_values(collect_runtime_sensitive_values(
request,
inherited_sensitive_values,
&artifact,
&capture_context,
))
})?,
};
match capture.extract_from_snapshot(source_artifact, &capture_context) {
Ok(Some((name, value))) => {
capture_context.insert(name.clone(), value.clone());
captured_values.insert(name, value);
}
Ok(None) => {}
Err(err) => {
return Err(RequestExecutionError::with_artifact(
err.to_string(),
artifact.clone(),
)
.with_sensitive_values(collect_runtime_sensitive_values(
request,
inherited_sensitive_values,
&artifact,
&capture_context,
)))
}
}
}
if !captured_values.is_empty() {
log::debug!("CAPTURED VALUES\n{:#?}", captured_values);
}
let status_code = artifact.status.as_str().to_string();
let mut export_env: HashMap<String, String> = context_map.clone();
export_env.insert("RESPONSE".to_string(), artifact.body.clone());
export_env.insert("STATUS".to_string(), status_code.clone());
if let Some(reason) = artifact.status.canonical_reason() {
export_env.insert("STATUS_TEXT".to_string(), reason.to_string());
}
export_env.insert("DESCRIPTION".to_string(), request.description.clone());
export_env.extend(extra_exports);
for (key, value) in &captured_values {
export_env.insert(key.clone(), value.replace('\0', ""));
}
let mut assignment_callbacks: Vec<(String, String)> = Vec::new();
let mut regular_callbacks: Vec<&String> = Vec::new();
if !request.callback_src.is_empty() {
for src in &request.callback_src {
match super::parse_callback_assignment(src) {
Ok(Some((command, variable))) => assignment_callbacks.push((command, variable)),
Ok(None) => regular_callbacks.push(src),
Err(err) => {
return Err(RequestExecutionError::with_artifact(
err.to_string(),
artifact.clone(),
)
.with_sensitive_values(collect_runtime_sensitive_values(
request,
inherited_sensitive_values,
&artifact,
&export_env,
)))
}
}
}
}
for (command, variable) in assignment_callbacks {
let output = parser::eval_shell_script(&command, &request.working_dir, Some(export_env.clone()));
let sanitized = super::sanitize_callback_assignment_output(&output);
export_env.insert(variable, sanitized);
}
let runtime_sensitive_values = collect_runtime_sensitive_values(
request,
inherited_sensitive_values,
&artifact,
&export_env,
);
let sensitive_export_values = collect_sensitive_export_values(request, &export_env);
let mut assertion_results = Vec::with_capacity(request.assertions.len());
for assertion in &request.assertions {
let should_execute = match assertion.should_execute(
&export_env,
¤t_snapshot,
&dependency_snapshots,
) {
Ok(should_execute) => should_execute,
Err(err) => {
let message = err.to_string();
assertion_results.push(AssertionOutcome::failed(
assertion.raw.clone(),
message.clone(),
err.1.clone(),
err.2.clone(),
));
return Err(RequestExecutionError::with_assertions_and_artifact(
message,
assertion_results,
artifact.clone(),
)
.with_sensitive_values(runtime_sensitive_values.clone()));
}
};
if !should_execute {
log::debug!("Skipping assertion '{}' due to guard", assertion.raw);
assertion_results.push(AssertionOutcome::skipped(
assertion.raw.clone(),
"guard evaluated to false",
));
continue;
}
if let Err(err) = assertion.evaluate(&export_env, ¤t_snapshot, &dependency_snapshots)
{
let message = err.to_string();
assertion_results.push(AssertionOutcome::failed(
assertion.raw.clone(),
message.clone(),
err.1.clone(),
err.2.clone(),
));
return Err(RequestExecutionError::with_assertions_and_artifact(
message,
assertion_results,
artifact.clone(),
)
.with_sensitive_values(runtime_sensitive_values.clone()));
}
assertion_results.push(AssertionOutcome::passed(assertion.raw.clone()));
if let Some(callback) = observer {
callback(ExecutionEvent::AssertionPassed {
request: request.description.clone(),
assertion: assertion.raw.clone(),
});
}
}
let mut callback_resp: Vec<String> = vec![];
if !regular_callbacks.is_empty() {
for src in regular_callbacks {
let output = parser::eval_shell_script(src, &request.working_dir, Some(export_env.clone()));
callback_resp.push(output);
}
log::debug!("CALLBACK RESPONSE\n{:#?}", callback_resp);
}
let output = if !callback_resp.is_empty() {
callback_resp.join("\n")
} else {
artifact.body.clone()
};
let final_export_env = super::apply_map_suffix_to_exports(export_env, request.map_iteration.as_ref());
Ok(RequestExecution {
output,
export_env: final_export_env,
artifact,
assertions: assertion_results,
sensitive_values: runtime_sensitive_values,
sensitive_export_values,
})
}
fn collect_runtime_sensitive_values(
request: &Request,
inherited_sensitive_values: &[String],
artifact: &ExecutionArtifact,
env: &HashMap<String, String>,
) -> Vec<String> {
let mut sensitive_values = request.sensitive_values.clone();
sensitive_values.extend(inherited_sensitive_values.iter().cloned());
for (key, value) in env {
if request.redaction_rules.should_redact_export_name(key) && !value.is_empty() {
sensitive_values.push(value.clone());
}
}
let snapshot = ResponseSnapshot::from(artifact);
for raw_path in request.redaction_rules.additional_body_paths() {
if let Ok(value) = resolve_redaction_body_value(raw_path, &snapshot, env) {
if !value.is_empty() {
sensitive_values.push(value);
}
}
}
dedupe_sensitive_values(sensitive_values)
}
fn collect_sensitive_export_values(
request: &Request,
export_env: &HashMap<String, String>,
) -> Vec<String> {
let mut sensitive_values = Vec::new();
for (key, value) in export_env {
if request.redaction_rules.should_redact_export_name(key) && !value.is_empty() {
sensitive_values.push(value.clone());
}
}
dedupe_sensitive_values(sensitive_values)
}
fn dedupe_sensitive_values(mut values: Vec<String>) -> Vec<String> {
values.retain(|value| !value.is_empty());
values.sort_by(|left, right| {
right
.len()
.cmp(&left.len())
.then_with(|| left.cmp(right))
});
values.dedup();
values
}
async fn execute_sse_open(
request: &Request,
operation: &SseOperation,
context_map: &HashMap<String, String>,
inherited_sensitive_values: &[String],
dependency_artifacts: &HashMap<String, ExecutionArtifact>,
observer: Option<&ExecutionObserver>,
sessions: &SessionRegistry,
) -> Result<RequestExecution, RequestExecutionError> {
let session_name = request.session_name.as_ref().ok_or_else(|| {
RequestExecutionError::new("SSE open steps require a session name")
})?;
let (client, dns_timing) = build_timed_reqwest_client()?;
let open_operation = materialize_sse_open_http_operation(&operation.http);
let built_request = build_reqwest_request(&open_operation, &client, context_map)
.map_err(|err| RequestExecutionError::new(err.to_string()))?;
let mut request_attributes = HashMap::new();
request_attributes.insert("url".to_string(), built_request.url().to_string());
request_attributes.insert("method".to_string(), built_request.method().as_str().to_string());
request_attributes.insert("action".to_string(), "open".to_string());
request_attributes.insert("sessionName".to_string(), session_name.clone());
let request_body = built_request
.body()
.and_then(|body| body.as_bytes())
.map(|body| String::from_utf8_lossy(body).into_owned())
.unwrap_or_default();
log::debug!("SSE OPEN REQUEST\n{:#?}", built_request);
let stream_open_timer = Instant::now();
let response = client
.execute(built_request)
.await
.map_err(|err| RequestExecutionError::new(err.to_string()))?;
let stream_open_duration = stream_open_timer.elapsed();
let status = response.status();
let headers = response.headers().clone();
if !status.is_success() {
let response_text = response
.text()
.await
.unwrap_or_else(|_| String::new())
.replace('\0', "");
return Err(RequestExecutionError::new(format!(
"SSE stream open returned unexpected HTTP status {}{}",
status.as_u16(),
render_body_suffix(response_text.as_str())
)));
}
if !is_event_stream_response(&headers) {
let response_text = response
.text()
.await
.unwrap_or_else(|_| String::new())
.replace('\0', "");
return Err(RequestExecutionError::new(format!(
"SSE stream open expected 'text/event-stream' but received '{}'{},",
headers
.get(reqwest::header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.unwrap_or("<missing>"),
render_body_suffix(response_text.as_str())
).trim_end_matches(',').to_string()));
}
let (sender, receiver) = mpsc::unbounded_channel();
let reader = tokio::spawn(async move {
pump_sse_response(response, sender).await;
});
let metadata = build_sse_session_metadata(&open_operation, status, &headers);
sessions.upsert_sse(
session_name.clone(),
metadata,
receiver,
reader.abort_handle(),
);
let mut response_attributes = HashMap::new();
response_attributes.insert("status".to_string(), status.as_u16().to_string());
let mut artifact = ExecutionArtifact::new_http(
status,
headers.clone(),
String::new(),
None,
vec![
ArtifactTranscript {
direction: ArtifactTranscriptDirection::Outgoing,
label: "sse.request".to_string(),
body: request_body,
attributes: request_attributes,
},
ArtifactTranscript {
direction: ArtifactTranscriptDirection::Incoming,
label: "sse.response".to_string(),
body: String::new(),
attributes: response_attributes,
},
],
);
artifact.protocol = RequestProtocol::Sse;
artifact.timing_phases = dns_timing.phases();
artifact
.timing_phases
.push(ArtifactTimingPhase::new("streamOpen", stream_open_duration));
let mut extra_exports = HashMap::new();
extra_exports.insert("SSE_ACTION".to_string(), "open".to_string());
extra_exports.insert("SSE_SESSION".to_string(), session_name.clone());
finalize_execution(
request,
artifact,
context_map,
inherited_sensitive_values,
dependency_artifacts,
observer,
extra_exports,
)
}
async fn execute_ws_open(
request: &Request,
operation: &WsOperation,
context_map: &HashMap<String, String>,
inherited_sensitive_values: &[String],
dependency_artifacts: &HashMap<String, ExecutionArtifact>,
observer: Option<&ExecutionObserver>,
sessions: &SessionRegistry,
) -> Result<RequestExecution, RequestExecutionError> {
let session_name = request
.session_name
.as_ref()
.ok_or_else(|| RequestExecutionError::new("WebSocket open steps require a session name"))?;
let handshake_request = build_ws_handshake_request(&operation.http, context_map)
.map_err(|err| RequestExecutionError::new(err.to_string()))?;
let request_url = handshake_request.uri().to_string();
let mut request_attributes = HashMap::new();
request_attributes.insert("url".to_string(), request_url.clone());
request_attributes.insert(
"method".to_string(),
operation.http.method.as_str().to_string(),
);
request_attributes.insert("action".to_string(), "open".to_string());
request_attributes.insert("sessionName".to_string(), session_name.clone());
log::debug!("WS OPEN REQUEST\n{:#?}", handshake_request);
let handshake_timer = Instant::now();
let (ws_stream, response) = connect_async(handshake_request)
.await
.map_err(|err| RequestExecutionError::new(err.to_string()))?;
let handshake_duration = handshake_timer.elapsed();
let status = response.status();
let headers = response.headers().clone();
let (writer, reader) = ws_stream.split();
let (incoming_tx, incoming_rx) = mpsc::unbounded_channel();
let (outgoing_tx, outgoing_rx) = mpsc::unbounded_channel();
let reader_task = tokio::spawn(async move {
pump_ws_stream(reader, incoming_tx).await;
});
let writer_task = tokio::spawn(async move {
drive_ws_writer(writer, outgoing_rx).await;
});
let metadata = build_ws_session_metadata(request_url.as_str(), status, &headers);
sessions.upsert_ws(
session_name.clone(),
metadata,
incoming_rx,
outgoing_tx,
reader_task.abort_handle(),
writer_task.abort_handle(),
);
let mut response_attributes = HashMap::new();
response_attributes.insert("status".to_string(), status.as_u16().to_string());
let mut artifact = ExecutionArtifact::new_http(
status,
headers.clone(),
String::new(),
None,
vec![
ArtifactTranscript {
direction: ArtifactTranscriptDirection::Outgoing,
label: "ws.request".to_string(),
body: String::new(),
attributes: request_attributes,
},
ArtifactTranscript {
direction: ArtifactTranscriptDirection::Incoming,
label: "ws.response".to_string(),
body: String::new(),
attributes: response_attributes,
},
],
);
artifact.protocol = RequestProtocol::Ws;
artifact.timing_phases = vec![ArtifactTimingPhase::new("handshake", handshake_duration)];
let mut extra_exports = HashMap::new();
extra_exports.insert("WS_ACTION".to_string(), "open".to_string());
extra_exports.insert("WS_SESSION".to_string(), session_name.clone());
finalize_execution(
request,
artifact,
context_map,
inherited_sensitive_values,
dependency_artifacts,
observer,
extra_exports,
)
}
async fn execute_ws_send(
request: &Request,
operation: &WsOperation,
kind: &str,
payload: &str,
context_map: &HashMap<String, String>,
inherited_sensitive_values: &[String],
dependency_artifacts: &HashMap<String, ExecutionArtifact>,
observer: Option<&ExecutionObserver>,
sessions: &SessionRegistry,
) -> Result<RequestExecution, RequestExecutionError> {
let session_name = request
.session_name
.as_ref()
.ok_or_else(|| RequestExecutionError::new("WebSocket send steps require a session name"))?;
let session = sessions.get(session_name).ok_or_else(|| {
RequestExecutionError::new(format!("WebSocket session '{}' has not been opened", session_name))
})?;
if session.protocol != RequestProtocol::Ws {
return Err(RequestExecutionError::new(format!(
"session '{}' belongs to protocol '{}' not 'ws'",
session_name,
session.protocol.as_str()
)));
}
let runtime = session.ws_runtime().ok_or_else(|| {
RequestExecutionError::new(format!(
"WebSocket session '{}' is missing its runtime stream handle",
session_name
))
})?;
let payload = context::try_inject_from_prompt(&context::resolve_with_context(payload, context_map))
.map_err(|err| RequestExecutionError::new(err.to_string()))?;
let send_timer = Instant::now();
runtime
.send(payload.clone())
.await
.map_err(RequestExecutionError::new)?;
let send_duration = send_timer.elapsed();
let (status, headers, url) = ws_handshake_from_session(&session.metadata)
.map_err(|err| RequestExecutionError::new(err.to_string()))?;
let mut outgoing_attributes = HashMap::new();
outgoing_attributes.insert("action".to_string(), "send".to_string());
outgoing_attributes.insert("kind".to_string(), kind.to_string());
outgoing_attributes.insert("sessionName".to_string(), session_name.clone());
outgoing_attributes.insert("url".to_string(), url);
outgoing_attributes.insert(
"method".to_string(),
operation.http.method.as_str().to_string(),
);
let mut artifact = ExecutionArtifact::new_http(
status,
headers,
String::new(),
None,
vec![ArtifactTranscript {
direction: ArtifactTranscriptDirection::Outgoing,
label: "ws.send".to_string(),
body: payload,
attributes: outgoing_attributes,
}],
);
artifact.protocol = RequestProtocol::Ws;
artifact.timing_phases = vec![ArtifactTimingPhase::new("send", send_duration)];
let mut extra_exports = HashMap::new();
extra_exports.insert("WS_ACTION".to_string(), "send".to_string());
extra_exports.insert("WS_SESSION".to_string(), session_name.clone());
extra_exports.insert("WS_KIND".to_string(), kind.to_string());
finalize_execution(
request,
artifact,
context_map,
inherited_sensitive_values,
dependency_artifacts,
observer,
extra_exports,
)
}
async fn execute_ws_receive(
request: &Request,
operation: &WsOperation,
within: &str,
context_map: &HashMap<String, String>,
inherited_sensitive_values: &[String],
dependency_artifacts: &HashMap<String, ExecutionArtifact>,
observer: Option<&ExecutionObserver>,
sessions: &SessionRegistry,
) -> Result<RequestExecution, RequestExecutionError> {
let session_name = request
.session_name
.as_ref()
.ok_or_else(|| RequestExecutionError::new("WebSocket receive steps require a session name"))?;
let session = sessions.get(session_name).ok_or_else(|| {
RequestExecutionError::new(format!("WebSocket session '{}' has not been opened", session_name))
})?;
if session.protocol != RequestProtocol::Ws {
return Err(RequestExecutionError::new(format!(
"session '{}' belongs to protocol '{}' not 'ws'",
session_name,
session.protocol.as_str()
)));
}
let runtime = session.ws_runtime().ok_or_else(|| {
RequestExecutionError::new(format!(
"WebSocket session '{}' is missing its runtime stream handle",
session_name
))
})?;
let wait_timer = Instant::now();
let message = receive_next_ws_message(runtime.as_ref(), session_name, within).await?;
let wait_duration = wait_timer.elapsed();
let (status, headers, url) = ws_handshake_from_session(&session.metadata)
.map_err(|err| RequestExecutionError::new(err.to_string()))?;
let mut outgoing_attributes = HashMap::new();
outgoing_attributes.insert("action".to_string(), "receive".to_string());
outgoing_attributes.insert("within".to_string(), within.to_string());
outgoing_attributes.insert("sessionName".to_string(), session_name.clone());
outgoing_attributes.insert("url".to_string(), url);
outgoing_attributes.insert(
"method".to_string(),
operation.http.method.as_str().to_string(),
);
let mut incoming_attributes = HashMap::new();
incoming_attributes.insert("status".to_string(), status.as_u16().to_string());
incoming_attributes.insert("kind".to_string(), message.kind.as_str().to_string());
let mut response_headers = headers;
response_headers.insert(
reqwest::header::HeaderName::from_static(INTERNAL_WS_KIND_HEADER),
parse_header_value(
message.kind.as_str().to_string(),
INTERNAL_WS_KIND_HEADER.to_string(),
)
.map_err(|err| RequestExecutionError::new(err.to_string()))?,
);
let mut artifact = ExecutionArtifact::new_http(
status,
response_headers,
message.data.clone(),
serde_json::from_str::<Value>(&message.data).ok(),
vec![
ArtifactTranscript {
direction: ArtifactTranscriptDirection::Outgoing,
label: "ws.receive".to_string(),
body: String::new(),
attributes: outgoing_attributes,
},
ArtifactTranscript {
direction: ArtifactTranscriptDirection::Incoming,
label: "ws.message".to_string(),
body: message.raw_message.clone(),
attributes: incoming_attributes,
},
],
);
artifact.protocol = RequestProtocol::Ws;
artifact.timing_phases = vec![ArtifactTimingPhase::new("wait", wait_duration)];
let mut extra_exports = HashMap::new();
extra_exports.insert("WS_ACTION".to_string(), "receive".to_string());
extra_exports.insert("WS_SESSION".to_string(), session_name.clone());
extra_exports.insert("WS_KIND".to_string(), message.kind.as_str().to_string());
finalize_execution(
request,
artifact,
context_map,
inherited_sensitive_values,
dependency_artifacts,
observer,
extra_exports,
)
}
async fn execute_ws_exchange(
request: &Request,
operation: &WsOperation,
kind: &str,
payload: &str,
within: &str,
context_map: &HashMap<String, String>,
inherited_sensitive_values: &[String],
dependency_artifacts: &HashMap<String, ExecutionArtifact>,
observer: Option<&ExecutionObserver>,
sessions: &SessionRegistry,
) -> Result<RequestExecution, RequestExecutionError> {
let session_name = request
.session_name
.as_ref()
.ok_or_else(|| RequestExecutionError::new("WebSocket exchange steps require a session name"))?;
let session = sessions.get(session_name).ok_or_else(|| {
RequestExecutionError::new(format!("WebSocket session '{}' has not been opened", session_name))
})?;
if session.protocol != RequestProtocol::Ws {
return Err(RequestExecutionError::new(format!(
"session '{}' belongs to protocol '{}' not 'ws'",
session_name,
session.protocol.as_str()
)));
}
let runtime = session.ws_runtime().ok_or_else(|| {
RequestExecutionError::new(format!(
"WebSocket session '{}' is missing its runtime stream handle",
session_name
))
})?;
let payload = context::try_inject_from_prompt(&context::resolve_with_context(payload, context_map))
.map_err(|err| RequestExecutionError::new(err.to_string()))?;
let send_timer = Instant::now();
runtime
.send(payload.clone())
.await
.map_err(RequestExecutionError::new)?;
let send_duration = send_timer.elapsed();
let wait_timer = Instant::now();
let message = receive_next_ws_message(runtime.as_ref(), session_name, within).await?;
let wait_duration = wait_timer.elapsed();
let (status, headers, url) = ws_handshake_from_session(&session.metadata)
.map_err(|err| RequestExecutionError::new(err.to_string()))?;
let mut outgoing_attributes = HashMap::new();
outgoing_attributes.insert("action".to_string(), "exchange".to_string());
outgoing_attributes.insert("kind".to_string(), kind.to_string());
outgoing_attributes.insert("within".to_string(), within.to_string());
outgoing_attributes.insert("sessionName".to_string(), session_name.clone());
outgoing_attributes.insert("url".to_string(), url);
outgoing_attributes.insert(
"method".to_string(),
operation.http.method.as_str().to_string(),
);
let mut incoming_attributes = HashMap::new();
incoming_attributes.insert("status".to_string(), status.as_u16().to_string());
incoming_attributes.insert("kind".to_string(), message.kind.as_str().to_string());
let mut response_headers = headers;
response_headers.insert(
reqwest::header::HeaderName::from_static(INTERNAL_WS_KIND_HEADER),
parse_header_value(
message.kind.as_str().to_string(),
INTERNAL_WS_KIND_HEADER.to_string(),
)
.map_err(|err| RequestExecutionError::new(err.to_string()))?,
);
let mut artifact = ExecutionArtifact::new_http(
status,
response_headers,
message.data.clone(),
serde_json::from_str::<Value>(&message.data).ok(),
vec![
ArtifactTranscript {
direction: ArtifactTranscriptDirection::Outgoing,
label: "ws.exchange".to_string(),
body: payload,
attributes: outgoing_attributes,
},
ArtifactTranscript {
direction: ArtifactTranscriptDirection::Incoming,
label: "ws.message".to_string(),
body: message.raw_message.clone(),
attributes: incoming_attributes,
},
],
);
artifact.protocol = RequestProtocol::Ws;
artifact.timing_phases = vec![
ArtifactTimingPhase::new("send", send_duration),
ArtifactTimingPhase::new("wait", wait_duration),
];
let mut extra_exports = HashMap::new();
extra_exports.insert("WS_ACTION".to_string(), "exchange".to_string());
extra_exports.insert("WS_SESSION".to_string(), session_name.clone());
extra_exports.insert("WS_KIND".to_string(), message.kind.as_str().to_string());
finalize_execution(
request,
artifact,
context_map,
inherited_sensitive_values,
dependency_artifacts,
observer,
extra_exports,
)
}
async fn receive_next_ws_message(
runtime: &crate::request::session::WsSessionRuntime,
session_name: &str,
within: &str,
) -> Result<crate::request::session::WsMessage, RequestExecutionError> {
let wait_duration = parse_within_duration(within).map_err(RequestExecutionError::new)?;
match timeout(wait_duration, runtime.recv()).await {
Ok(Some(message)) => Ok(message),
Ok(None) => Err(RequestExecutionError::new(format!(
"WebSocket session '{}' closed before a message was received",
session_name
))),
Err(_) => Err(RequestExecutionError::new(format!(
"timed out waiting {} for WebSocket message on session '{}'",
within, session_name
))),
}
}
async fn execute_sse_receive(
request: &Request,
operation: &SseOperation,
within: &str,
context_map: &HashMap<String, String>,
inherited_sensitive_values: &[String],
dependency_artifacts: &HashMap<String, ExecutionArtifact>,
observer: Option<&ExecutionObserver>,
sessions: &SessionRegistry,
) -> Result<RequestExecution, RequestExecutionError> {
let session_name = request.session_name.as_ref().ok_or_else(|| {
RequestExecutionError::new("SSE receive steps require a session name")
})?;
let session = sessions.get(session_name).ok_or_else(|| {
RequestExecutionError::new(format!("SSE session '{}' has not been opened", session_name))
})?;
if session.protocol != RequestProtocol::Sse {
return Err(RequestExecutionError::new(format!(
"session '{}' belongs to protocol '{}' not 'sse'",
session_name,
session.protocol.as_str()
)));
}
let runtime = session.sse_runtime().ok_or_else(|| {
RequestExecutionError::new(format!(
"SSE session '{}' is missing its runtime stream handle",
session_name
))
})?;
let wait_duration = parse_within_duration(within).map_err(RequestExecutionError::new)?;
let wait_timer = Instant::now();
let message = match timeout(wait_duration, runtime.recv()).await {
Ok(Some(message)) => message,
Ok(None) => {
return Err(RequestExecutionError::new(format!(
"SSE stream '{}' closed before an event was received",
session_name
)))
}
Err(_) => {
return Err(RequestExecutionError::new(format!(
"timed out waiting {} for SSE event on session '{}'",
within, session_name
)))
}
};
let wait_phase_duration = wait_timer.elapsed();
let (status, headers) = sse_handshake_from_session(&session.metadata)
.map_err(|err| RequestExecutionError::new(err.to_string()))?;
let mut outgoing_attributes = HashMap::new();
outgoing_attributes.insert("action".to_string(), "receive".to_string());
outgoing_attributes.insert("within".to_string(), within.to_string());
outgoing_attributes.insert("sessionName".to_string(), session_name.clone());
outgoing_attributes.insert("url".to_string(), operation.http.url.clone());
outgoing_attributes.insert(
"method".to_string(),
operation.http.method.as_str().to_string(),
);
let mut incoming_attributes = HashMap::new();
incoming_attributes.insert("status".to_string(), status.as_u16().to_string());
if let Some(event) = &message.event {
incoming_attributes.insert("event".to_string(), event.clone());
}
if let Some(id) = &message.id {
incoming_attributes.insert("id".to_string(), id.clone());
}
let mut response_headers = headers;
if let Some(event) = &message.event {
response_headers.insert(
reqwest::header::HeaderName::from_static(INTERNAL_SSE_EVENT_HEADER),
parse_header_value(event.clone(), INTERNAL_SSE_EVENT_HEADER.to_string())
.map_err(|err| RequestExecutionError::new(err.to_string()))?,
);
}
if let Some(id) = &message.id {
response_headers.insert(
reqwest::header::HeaderName::from_static(INTERNAL_SSE_ID_HEADER),
parse_header_value(id.clone(), INTERNAL_SSE_ID_HEADER.to_string())
.map_err(|err| RequestExecutionError::new(err.to_string()))?,
);
}
let mut artifact = ExecutionArtifact::new_http(
status,
response_headers,
message.data.clone(),
serde_json::from_str::<Value>(&message.data).ok(),
vec![
ArtifactTranscript {
direction: ArtifactTranscriptDirection::Outgoing,
label: "sse.receive".to_string(),
body: String::new(),
attributes: outgoing_attributes,
},
ArtifactTranscript {
direction: ArtifactTranscriptDirection::Incoming,
label: "sse.event".to_string(),
body: message.raw_event.clone(),
attributes: incoming_attributes,
},
],
);
artifact.protocol = RequestProtocol::Sse;
artifact.timing_phases = vec![ArtifactTimingPhase::new("wait", wait_phase_duration)];
let mut extra_exports = HashMap::new();
extra_exports.insert("SSE_ACTION".to_string(), "receive".to_string());
extra_exports.insert("SSE_SESSION".to_string(), session_name.clone());
if let Some(event) = &message.event {
extra_exports.insert("SSE_EVENT".to_string(), event.clone());
}
if let Some(id) = &message.id {
extra_exports.insert("SSE_ID".to_string(), id.clone());
}
finalize_execution(
request,
artifact,
context_map,
inherited_sensitive_values,
dependency_artifacts,
observer,
extra_exports,
)
}
async fn pump_sse_response(
mut response: reqwest::Response,
sender: mpsc::UnboundedSender<SseMessage>,
) {
let mut parser = SseEventParser::default();
loop {
match response.chunk().await {
Ok(Some(chunk)) => {
let text = String::from_utf8_lossy(&chunk);
let messages = parser.push(text.as_ref());
for message in messages {
if sender.send(message).is_err() {
return;
}
}
}
Ok(None) => {
if let Some(message) = parser.finish() {
let _ = sender.send(message);
}
return;
}
Err(err) => {
log::debug!("SSE stream read error: {}", err);
return;
}
}
}
}
#[derive(Default)]
struct SseEventParser {
buffer: String,
event: Option<String>,
id: Option<String>,
data_lines: Vec<String>,
raw_lines: Vec<String>,
}
impl SseEventParser {
fn push(&mut self, chunk: &str) -> Vec<SseMessage> {
self.buffer.push_str(chunk);
let mut messages = Vec::new();
while let Some(position) = self.buffer.find('\n') {
let mut line = self.buffer[..position].to_string();
self.buffer.drain(..=position);
if line.ends_with('\r') {
line.pop();
}
if let Some(message) = self.process_line(line) {
messages.push(message);
}
}
messages
}
fn finish(&mut self) -> Option<SseMessage> {
if !self.buffer.is_empty() {
let mut line = std::mem::take(&mut self.buffer);
if line.ends_with('\r') {
line.pop();
}
self.process_line(line)
} else {
self.finalize_event()
}
}
fn process_line(&mut self, line: String) -> Option<SseMessage> {
if line.is_empty() {
return self.finalize_event();
}
self.raw_lines.push(line.clone());
if line.starts_with(':') {
return None;
}
let (field, value) = match line.split_once(':') {
Some((field, value)) => (field, value.strip_prefix(' ').unwrap_or(value)),
None => (line.as_str(), ""),
};
match field {
"event" => self.event = Some(value.to_string()),
"data" => self.data_lines.push(value.to_string()),
"id" => self.id = Some(value.to_string()),
_ => {}
}
None
}
fn finalize_event(&mut self) -> Option<SseMessage> {
if self.data_lines.is_empty() && self.event.is_none() && self.id.is_none() {
self.raw_lines.clear();
return None;
}
let data = self.data_lines.join("\n");
let raw_event = self.raw_lines.join("\n");
self.data_lines.clear();
self.raw_lines.clear();
Some(SseMessage {
event: Some(self.event.take().unwrap_or_else(|| "message".to_string())),
id: self.id.take(),
data,
raw_event,
})
}
}
fn materialize_sse_open_http_operation(http_operation: &HttpOperation) -> HttpOperation {
let mut http = http_operation.clone();
if !http
.headers
.keys()
.any(|key| key.eq_ignore_ascii_case("accept"))
{
http.headers.insert(
"Accept".to_string(),
"text/event-stream".to_string(),
);
}
http
}
fn build_sse_session_metadata(
open_operation: &HttpOperation,
status: StatusCode,
headers: &HeaderMap,
) -> HashMap<String, String> {
let mut metadata = HashMap::new();
metadata.insert("url".to_string(), open_operation.url.clone());
metadata.insert("status".to_string(), status.as_u16().to_string());
metadata.insert(
"headersJson".to_string(),
serde_json::to_string(&header_map_to_json(headers))
.expect("sse session headers should serialize"),
);
metadata
}
fn sse_handshake_from_session(
metadata: &HashMap<String, String>,
) -> Result<(StatusCode, HeaderMap), Box<dyn Error>> {
let status = metadata
.get("status")
.ok_or_else(|| "SSE session is missing its handshake status".to_string())?
.parse::<u16>()?;
let headers = metadata
.get("headersJson")
.map(|raw| json_to_header_map(raw.as_str()))
.transpose()?
.unwrap_or_default();
Ok((
StatusCode::from_u16(status).map_err(|err| err.to_string())?,
headers,
))
}
fn build_ws_session_metadata(
request_url: &str,
status: StatusCode,
headers: &HeaderMap,
) -> HashMap<String, String> {
let mut metadata = HashMap::new();
metadata.insert("url".to_string(), request_url.to_string());
metadata.insert("status".to_string(), status.as_u16().to_string());
metadata.insert(
"headersJson".to_string(),
serde_json::to_string(&header_map_to_json(headers))
.expect("ws session headers should serialize"),
);
metadata
}
fn ws_handshake_from_session(
metadata: &HashMap<String, String>,
) -> Result<(StatusCode, HeaderMap, String), Box<dyn Error>> {
let status = metadata
.get("status")
.ok_or_else(|| "WebSocket session is missing its handshake status".to_string())?
.parse::<u16>()?;
let url = metadata
.get("url")
.cloned()
.ok_or_else(|| "WebSocket session is missing its handshake URL".to_string())?;
let headers = metadata
.get("headersJson")
.map(|raw| json_to_header_map(raw.as_str()))
.transpose()?
.unwrap_or_default();
Ok((
StatusCode::from_u16(status).map_err(|err| err.to_string())?,
headers,
url,
))
}
fn header_map_to_json(headers: &HeaderMap) -> HashMap<String, String> {
let mut serialized = HashMap::new();
for (name, value) in headers.iter() {
let value = value.to_str().unwrap_or_default();
serialized
.entry(name.as_str().to_string())
.and_modify(|existing: &mut String| {
if !existing.is_empty() {
existing.push_str(", ");
}
existing.push_str(value);
})
.or_insert_with(|| value.to_string());
}
serialized
}
fn json_to_header_map(raw: &str) -> Result<HeaderMap, Box<dyn Error>> {
let parsed = serde_json::from_str::<HashMap<String, String>>(raw)?;
let mut headers = HeaderMap::new();
for (key, value) in parsed {
headers.insert(
parse_header_name(key.as_str())?,
parse_header_value(value, key)?,
);
}
Ok(headers)
}
fn render_body_suffix(body: &str) -> String {
let trimmed = body.trim();
if trimmed.is_empty() {
String::new()
} else {
format!(": {}", trimmed)
}
}
async fn drive_ws_writer<S>(
mut writer: SplitSink<WebSocketStream<S>, Message>,
mut commands: mpsc::UnboundedReceiver<WsCommand>,
) where
S: AsyncRead + AsyncWrite + Unpin,
{
while let Some(command) = commands.recv().await {
let result = match writer.send(Message::Text(command.payload.into())).await {
Ok(()) => writer.flush().await.map_err(|err| err.to_string()),
Err(err) => Err(err.to_string()),
};
let should_stop = result.is_err();
let _ = command.completion.send(result);
if should_stop {
return;
}
}
let _ = writer.close().await;
}
async fn pump_ws_stream<S>(
mut reader: SplitStream<WebSocketStream<S>>,
sender: mpsc::UnboundedSender<WsMessage>,
) where
S: AsyncRead + AsyncWrite + Unpin,
{
while let Some(result) = reader.next().await {
match result {
Ok(message) => {
let Some(message) = normalize_ws_message(message) else {
continue;
};
let should_stop = message.kind == WsMessageKind::Close;
if sender.send(message).is_err() {
return;
}
if should_stop {
return;
}
}
Err(err) => {
log::debug!("WebSocket stream read error: {}", err);
return;
}
}
}
}
fn normalize_ws_message(message: Message) -> Option<WsMessage> {
match message {
Message::Text(text) => {
let text = text.to_string();
Some(WsMessage {
kind: WsMessageKind::Text,
data: text.clone(),
raw_message: text,
})
}
Message::Binary(bytes) => {
let data = render_ws_binary_payload(bytes.as_ref());
Some(WsMessage {
kind: WsMessageKind::Binary,
data: data.clone(),
raw_message: data,
})
}
Message::Close(frame) => {
let raw_message = match frame.as_ref() {
Some(frame) if frame.reason.is_empty() => {
format!("close code={}", u16::from(frame.code))
}
Some(frame) => format!(
"close code={} reason={}",
u16::from(frame.code),
frame.reason
),
None => "close".to_string(),
};
let data = frame
.as_ref()
.map(|frame| frame.reason.to_string())
.unwrap_or_default();
Some(WsMessage {
kind: WsMessageKind::Close,
data,
raw_message,
})
}
Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => None,
}
}
fn render_ws_binary_payload(bytes: &[u8]) -> String {
match std::str::from_utf8(bytes) {
Ok(text) => text.to_string(),
Err(_) => bytes
.iter()
.map(|byte| format!("{:02x}", byte))
.collect::<String>(),
}
}
fn build_ws_handshake_request(
operation: &HttpOperation,
context_map: &HashMap<String, String>,
) -> Result<http::Request<()>, Box<dyn Error>> {
let resolved_url = context::resolve_with_context(&operation.url, context_map);
let resolved_url = context::try_inject_from_prompt(&resolved_url)?;
let mut url = reqwest::Url::parse(&resolved_url)?;
{
let mut query_pairs = url.query_pairs_mut();
for (key, value) in &operation.query_params {
let resolved = context::resolve_with_context(value, context_map);
let resolved = context::try_inject_from_prompt(&resolved)?;
query_pairs.append_pair(key, &resolved);
}
}
let mut request = url.as_str().into_client_request()?;
*request.method_mut() = operation.method.clone();
for (key, value) in &operation.headers {
let resolved_value = context::resolve_with_context(value, context_map);
request.headers_mut().insert(
parse_header_name(key)?,
parse_header_value(context::try_inject_from_prompt(&resolved_value)?, key.clone())?,
);
}
Ok(request)
}
struct ResolvedGraphqlRequest {
http_operation: HttpOperation,
operation_name: Option<String>,
document: String,
variables_json: Option<String>,
}
struct NormalizedHttpResponse {
body: String,
json: Option<Value>,
}
struct ResolvedMcpInitialize {
protocol_version: String,
client_name: String,
client_version: String,
capabilities_json: String,
}
struct ResolvedMcpToolCall {
tool_name: String,
arguments_json: String,
}
enum ResolvedMcpCall {
Initialize(ResolvedMcpInitialize),
ToolsList,
ResourcesList,
ToolsCall(ResolvedMcpToolCall),
}
struct ResolvedMcpRequest {
http_operation: HttpOperation,
call: ResolvedMcpCall,
request_attributes: HashMap<String, String>,
export_overrides: HashMap<String, String>,
}
fn resolve_graphql_request(
operation: &GraphqlOperation,
context_map: &HashMap<String, String>,
) -> Result<ResolvedGraphqlRequest, Box<dyn Error>> {
let document = context::try_inject_from_prompt(&context::resolve_with_context(
&operation.document,
context_map,
))?;
let operation_name = operation
.operation_name
.as_ref()
.map(|value| {
context::try_inject_from_prompt(&context::resolve_with_context(value, context_map))
})
.transpose()?;
let variables_json = operation
.variables_json
.as_ref()
.map(|value| {
context::try_inject_from_prompt(&context::resolve_with_context(value, context_map))
})
.transpose()?;
let http_operation = materialize_graphql_http_operation(
&operation.http,
operation_name.clone(),
document.clone(),
variables_json.clone(),
)?;
Ok(ResolvedGraphqlRequest {
http_operation,
operation_name,
document,
variables_json,
})
}
fn materialize_graphql_http_operation(
http_operation: &HttpOperation,
operation_name: Option<String>,
document: String,
variables_json: Option<String>,
) -> Result<HttpOperation, serde_json::Error> {
let mut payload = serde_json::Map::new();
payload.insert(
"query".to_string(),
serde_json::Value::String(document),
);
if let Some(operation_name) = &operation_name {
payload.insert(
"operationName".to_string(),
serde_json::Value::String(operation_name.clone()),
);
}
if let Some(variables_json) = &variables_json {
payload.insert(
"variables".to_string(),
serde_json::from_str::<serde_json::Value>(variables_json)?,
);
}
let mut http = http_operation.clone();
http.body = Some(serde_json::Value::Object(payload).to_string());
if http.body_content_type.is_none()
&& !http
.headers
.keys()
.any(|key| key.eq_ignore_ascii_case("content-type"))
{
http.body_content_type = Some("application/json".to_string());
}
if !http
.headers
.keys()
.any(|key| key.eq_ignore_ascii_case("accept"))
{
http.headers.insert(
"Accept".to_string(),
"application/graphql-response+json, application/json".to_string(),
);
}
Ok(http)
}
fn resolved_mcp_call_for_render(call: &McpCall) -> ResolvedMcpCall {
match call {
McpCall::Initialize(initialize) => ResolvedMcpCall::Initialize(ResolvedMcpInitialize {
protocol_version: initialize
.protocol_version
.clone()
.unwrap_or_else(|| DEFAULT_MCP_PROTOCOL_VERSION.to_string()),
client_name: initialize
.client_name
.clone()
.unwrap_or_else(|| DEFAULT_MCP_CLIENT_NAME.to_string()),
client_version: initialize
.client_version
.clone()
.unwrap_or_else(|| env!("CARGO_PKG_VERSION").to_string()),
capabilities_json: initialize
.capabilities_json
.clone()
.unwrap_or_else(|| "{}".to_string()),
}),
McpCall::ToolsList => ResolvedMcpCall::ToolsList,
McpCall::ResourcesList => ResolvedMcpCall::ResourcesList,
McpCall::ToolsCall(tool_call) => ResolvedMcpCall::ToolsCall(ResolvedMcpToolCall {
tool_name: tool_call.tool_name.clone(),
arguments_json: tool_call
.arguments_json
.clone()
.unwrap_or_else(|| "{}".to_string()),
}),
}
}
fn resolve_mcp_request(
request: &Request,
operation: &McpOperation,
context_map: &HashMap<String, String>,
sessions: &SessionRegistry,
) -> Result<ResolvedMcpRequest, Box<dyn Error>> {
let mut http_operation = operation.http.clone();
if let Some(session_name) = &request.session_name {
if !matches!(operation.call, McpCall::Initialize(_)) {
let session = sessions
.get(session_name)
.ok_or_else(|| format!("MCP session '{}' has not been initialized", session_name))?;
if session.protocol != RequestProtocol::Mcp {
return Err(format!(
"session '{}' belongs to protocol '{}' not 'mcp'",
session_name,
session.protocol.as_str()
)
.into());
}
hydrate_mcp_http_operation_from_session(&mut http_operation, &session.metadata)?;
}
}
let mut request_attributes = HashMap::new();
request_attributes.insert("call".to_string(), operation.call.method_name().to_string());
let mut export_overrides = HashMap::new();
export_overrides.insert("MCP_CALL".to_string(), operation.call.method_name().to_string());
if let Some(session_name) = &request.session_name {
request_attributes.insert("sessionName".to_string(), session_name.clone());
export_overrides.insert("MCP_SESSION".to_string(), session_name.clone());
}
let call = match &operation.call {
McpCall::Initialize(initialize) => {
let protocol_version = resolve_mcp_string(
initialize.protocol_version.as_deref(),
context_map,
Some(DEFAULT_MCP_PROTOCOL_VERSION),
)?
.expect("defaulted protocol version should exist");
let client_name = resolve_mcp_string(
initialize.client_name.as_deref(),
context_map,
Some(DEFAULT_MCP_CLIENT_NAME),
)?
.expect("defaulted client name should exist");
let client_version = resolve_mcp_string(
initialize.client_version.as_deref(),
context_map,
Some(env!("CARGO_PKG_VERSION")),
)?
.expect("defaulted client version should exist");
let capabilities_json = resolve_mcp_string(
initialize.capabilities_json.as_deref(),
context_map,
Some("{}"),
)?
.expect("defaulted capabilities should exist");
ensure_json_object("capabilities", &capabilities_json)?;
request_attributes.insert("protocolVersion".to_string(), protocol_version.clone());
request_attributes.insert("clientName".to_string(), client_name.clone());
request_attributes.insert("clientVersion".to_string(), client_version.clone());
request_attributes.insert("capabilities".to_string(), capabilities_json.clone());
export_overrides.insert("MCP_PROTOCOL_VERSION".to_string(), protocol_version.clone());
export_overrides.insert("MCP_CLIENT_NAME".to_string(), client_name.clone());
export_overrides.insert("MCP_CLIENT_VERSION".to_string(), client_version.clone());
export_overrides.insert("MCP_CAPABILITIES".to_string(), capabilities_json.clone());
ResolvedMcpCall::Initialize(ResolvedMcpInitialize {
protocol_version,
client_name,
client_version,
capabilities_json,
})
}
McpCall::ToolsList => ResolvedMcpCall::ToolsList,
McpCall::ResourcesList => ResolvedMcpCall::ResourcesList,
McpCall::ToolsCall(tool_call) => {
let tool_name = resolve_mcp_string(Some(&tool_call.tool_name), context_map, None)?
.ok_or_else(|| "MCP tool calls require a tool name".to_string())?;
let arguments_json = resolve_mcp_string(
tool_call.arguments_json.as_deref(),
context_map,
Some("{}"),
)?
.expect("defaulted arguments should exist");
ensure_json_object("arguments", &arguments_json)?;
request_attributes.insert("tool".to_string(), tool_name.clone());
request_attributes.insert("arguments".to_string(), arguments_json.clone());
export_overrides.insert("MCP_TOOL".to_string(), tool_name.clone());
export_overrides.insert("MCP_ARGUMENTS".to_string(), arguments_json.clone());
ResolvedMcpCall::ToolsCall(ResolvedMcpToolCall {
tool_name,
arguments_json,
})
}
};
let request_id = MCP_JSON_RPC_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
let http_operation = materialize_mcp_http_operation(&http_operation, &call, request_id)?;
Ok(ResolvedMcpRequest {
http_operation,
call,
request_attributes,
export_overrides,
})
}
fn resolve_mcp_string(
raw: Option<&str>,
context_map: &HashMap<String, String>,
default: Option<&str>,
) -> Result<Option<String>, Box<dyn Error>> {
match raw {
Some(raw) => Ok(Some(context::try_inject_from_prompt(
&context::resolve_with_context(raw, context_map),
)?)),
None => Ok(default.map(|value| value.to_string())),
}
}
fn normalize_http_response_body(
protocol: RequestProtocol,
headers: &reqwest::header::HeaderMap,
body: &str,
expected_mcp_response_id: Option<&Value>,
) -> NormalizedHttpResponse {
if matches!(protocol, RequestProtocol::Mcp) {
if let Ok(parsed) = serde_json::from_str::<Value>(body) {
return NormalizedHttpResponse {
body: body.to_string(),
json: Some(parsed),
};
}
if is_event_stream_response(headers) {
if let Some(parsed) = extract_mcp_response_from_sse(body, expected_mcp_response_id) {
return NormalizedHttpResponse {
body: parsed.to_string(),
json: Some(parsed),
};
}
}
}
NormalizedHttpResponse {
body: body.to_string(),
json: serde_json::from_str::<Value>(body).ok(),
}
}
fn is_event_stream_response(headers: &reqwest::header::HeaderMap) -> bool {
headers
.iter()
.find(|(name, _)| name.as_str().eq_ignore_ascii_case("content-type"))
.and_then(|(_, value)| value.to_str().ok())
.map(|value| {
value
.split(';')
.next()
.map(|mime| mime.trim().eq_ignore_ascii_case("text/event-stream"))
.unwrap_or(false)
})
.unwrap_or(false)
}
fn extract_json_rpc_id(body: &str) -> Option<Value> {
serde_json::from_str::<Value>(body)
.ok()
.and_then(|json| json.get("id").cloned())
}
fn extract_mcp_response_from_sse(body: &str, expected_id: Option<&Value>) -> Option<Value> {
let mut current_data_lines: Vec<String> = Vec::new();
let mut matched_response: Option<Value> = None;
let mut fallback_response: Option<Value> = None;
let finalize_event = |data_lines: &mut Vec<String>, matched: &mut Option<Value>, fallback: &mut Option<Value>| {
if data_lines.is_empty() {
return;
}
let payload = data_lines.join("\n");
data_lines.clear();
if payload.trim().is_empty() {
return;
}
let Ok(json) = serde_json::from_str::<Value>(&payload) else {
return;
};
if !is_json_rpc_response(&json) {
return;
}
if let Some(expected_id) = expected_id {
if json.get("id") == Some(expected_id) {
*matched = Some(json);
return;
}
}
*fallback = Some(json);
};
for line in body.lines() {
let line = line.trim_end_matches('\r');
if line.is_empty() {
finalize_event(&mut current_data_lines, &mut matched_response, &mut fallback_response);
continue;
}
if line.starts_with(':') {
continue;
}
if let Some(data) = line.strip_prefix("data:") {
current_data_lines.push(data.trim_start().to_string());
}
}
finalize_event(&mut current_data_lines, &mut matched_response, &mut fallback_response);
matched_response.or(fallback_response)
}
fn is_json_rpc_response(json: &Value) -> bool {
json.get("jsonrpc") == Some(&Value::String("2.0".to_string()))
&& (json.get("result").is_some() || json.get("error").is_some())
}
fn ensure_json_object(field_name: &str, raw_json: &str) -> Result<(), Box<dyn Error>> {
let parsed = serde_json::from_str::<serde_json::Value>(raw_json)?;
if !parsed.is_object() {
return Err(format!("MCP {} must be a JSON object", field_name).into());
}
Ok(())
}
fn hydrate_mcp_http_operation_from_session(
http_operation: &mut HttpOperation,
metadata: &HashMap<String, String>,
) -> Result<(), Box<dyn Error>> {
if let Some(headers_json) = metadata.get("headersJson") {
let session_headers = serde_json::from_str::<HashMap<String, String>>(headers_json)?;
for (key, value) in session_headers {
if !http_operation
.headers
.keys()
.any(|existing| existing.eq_ignore_ascii_case(&key))
{
http_operation.headers.insert(key, value);
}
}
}
if http_operation.url.trim().is_empty() {
if let Some(url) = metadata.get("url") {
http_operation.url = url.clone();
}
}
Ok(())
}
fn materialize_mcp_http_operation(
http_operation: &HttpOperation,
call: &ResolvedMcpCall,
request_id: u64,
) -> Result<HttpOperation, serde_json::Error> {
let payload = match call {
ResolvedMcpCall::Initialize(initialize) => {
let capabilities = serde_json::from_str::<serde_json::Value>(&initialize.capabilities_json)?;
serde_json::json!({
"jsonrpc": "2.0",
"id": request_id,
"method": "initialize",
"params": {
"protocolVersion": initialize.protocol_version,
"capabilities": capabilities,
"clientInfo": {
"name": initialize.client_name,
"version": initialize.client_version,
}
}
})
}
ResolvedMcpCall::ToolsList => serde_json::json!({
"jsonrpc": "2.0",
"id": request_id,
"method": "tools/list",
"params": {}
}),
ResolvedMcpCall::ResourcesList => serde_json::json!({
"jsonrpc": "2.0",
"id": request_id,
"method": "resources/list",
"params": {}
}),
ResolvedMcpCall::ToolsCall(tool_call) => {
let arguments = serde_json::from_str::<serde_json::Value>(&tool_call.arguments_json)?;
serde_json::json!({
"jsonrpc": "2.0",
"id": request_id,
"method": "tools/call",
"params": {
"name": tool_call.tool_name,
"arguments": arguments,
}
})
}
};
let mut http = http_operation.clone();
http.body = Some(payload.to_string());
if http.body_content_type.is_none()
&& !http
.headers
.keys()
.any(|key| key.eq_ignore_ascii_case("content-type"))
{
http.body_content_type = Some("application/json".to_string());
}
if !http
.headers
.keys()
.any(|key| key.eq_ignore_ascii_case("accept"))
{
http.headers.insert(
"Accept".to_string(),
"application/json, text/event-stream".to_string(),
);
}
Ok(http)
}
fn materialize_mcp_initialized_notification_http_operation(
http_operation: &HttpOperation,
) -> HttpOperation {
let mut http = http_operation.clone();
http.body = Some(
serde_json::json!({
"jsonrpc": "2.0",
"method": "notifications/initialized",
"params": {}
})
.to_string(),
);
if http.body_content_type.is_none()
&& !http
.headers
.keys()
.any(|key| key.eq_ignore_ascii_case("content-type"))
{
http.body_content_type = Some("application/json".to_string());
}
if !http
.headers
.keys()
.any(|key| key.eq_ignore_ascii_case("accept"))
{
http.headers.insert(
"Accept".to_string(),
"application/json, text/event-stream".to_string(),
);
}
http
}
async fn send_mcp_initialized_notification(
http_operation: &HttpOperation,
context_map: &HashMap<String, String>,
) -> Result<(), Box<dyn Error>> {
let client = reqwest::Client::new();
let notification_operation = materialize_mcp_initialized_notification_http_operation(http_operation);
let built_request = build_reqwest_request(¬ification_operation, &client, context_map)?;
let response = client.execute(built_request).await?;
if !matches!(response.status().as_u16(), 200 | 202 | 204) {
return Err(format!(
"MCP initialized notification returned unexpected HTTP status {}",
response.status()
)
.into());
}
Ok(())
}
pub(crate) fn build_reqwest_request(
operation: &HttpOperation,
client: &reqwest::Client,
context_map: &HashMap<String, String>,
) -> Result<reqwest::Request, Box<dyn std::error::Error>> {
let resolved_url = context::resolve_with_context(&operation.url, context_map);
let mut request = client.request(
operation.method.clone(),
context::try_inject_from_prompt(&resolved_url)?,
);
let multipart_payload = build_multipart_payload(operation, context_map)?;
let mut query_params: HashMap<String, String> = HashMap::new();
for (key, value) in &operation.query_params {
let resolved = context::resolve_with_context(value, context_map);
let resolved = context::try_inject_from_prompt(&resolved)?;
query_params.insert(key.clone(), resolved);
}
request = request.query(&query_params);
if let Some((body, boundary)) = multipart_payload {
request = request.header(
reqwest::header::CONTENT_TYPE,
format!("multipart/form-data; boundary={boundary}"),
);
request = request.body(body);
}
if let Some(body) = &operation.body {
let resolved = context::resolve_with_context(body, context_map);
request = request.body(context::try_inject_from_prompt(&resolved)?);
}
let mut request = request.build()?;
if let Some(content_type) = &operation.body_content_type {
let resolved = context::resolve_with_context(content_type, context_map);
request.headers_mut().remove(reqwest::header::CONTENT_TYPE);
request.headers_mut().insert(
reqwest::header::CONTENT_TYPE,
parse_header_value(context::try_inject_from_prompt(&resolved)?, "Content-Type".to_string())?,
);
}
for (key, value) in &operation.headers {
let resolved_value = context::resolve_with_context(value, context_map);
let header_name = parse_header_name(key)?;
request.headers_mut().remove(&header_name);
request.headers_mut().insert(
header_name,
parse_header_value(context::try_inject_from_prompt(&resolved_value)?, key.clone())?,
);
}
Ok(request)
}
fn build_multipart_payload(
operation: &HttpOperation,
context_map: &HashMap<String, String>,
) -> Result<Option<(Vec<u8>, String)>, Box<dyn std::error::Error>> {
if operation.form_data.is_empty() {
return Ok(None);
}
let boundary = generate_multipart_boundary();
let mut body = Vec::new();
for (name, value) in &operation.form_data {
body.extend_from_slice(b"--");
body.extend_from_slice(boundary.as_bytes());
body.extend_from_slice(b"\r\n");
let escaped_name = escape_multipart_quoted_string(name);
match value {
FormDataType::Text(text) => {
let resolved = context::resolve_with_context(text, context_map);
let resolved = context::try_inject_from_prompt(&resolved)?;
body.extend_from_slice(
format!(
"Content-Disposition: form-data; name=\"{}\"\r\n\r\n",
escaped_name
)
.as_bytes(),
);
body.extend_from_slice(resolved.as_bytes());
}
FormDataType::File(filepath) => {
let filename = filepath
.file_name()
.and_then(|name| name.to_str())
.ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("Multipart file names must be valid UTF-8: {}", filepath.display()),
)
})?;
let escaped_filename = escape_multipart_quoted_string(filename);
body.extend_from_slice(
format!(
"Content-Disposition: form-data; name=\"{}\"; filename=\"{}\"\r\n\r\n",
escaped_name, escaped_filename
)
.as_bytes(),
);
body.extend_from_slice(&std::fs::read(filepath)?);
}
}
body.extend_from_slice(b"\r\n");
}
body.extend_from_slice(b"--");
body.extend_from_slice(boundary.as_bytes());
body.extend_from_slice(b"--\r\n");
Ok(Some((body, boundary)))
}
fn render_http_curl(operation: &HttpOperation) -> String {
let mut curl = String::new();
let headers = operation
.headers
.iter()
.map(|(key, value)| format!("-H '{}: {}'", key, value))
.collect::<Vec<String>>()
.join(" ");
let has_manual_content_type = operation
.headers
.keys()
.any(|key| key.eq_ignore_ascii_case("content-type"));
let headers = match (&operation.body_content_type, has_manual_content_type) {
(Some(content_type), false) => {
format!("-H 'Content-Type: {}' {}", content_type, headers)
}
_ => headers,
};
let query_params = operation
.query_params
.iter()
.map(|(key, value)| format!("{}={}", key, value))
.collect::<Vec<String>>()
.join("&");
let query_params = match query_params.len() {
0 => "".to_string(),
_ => format!("?{}", query_params),
};
let form_data = operation
.form_data
.iter()
.map(|(key, value)| match value {
FormDataType::Text(text) => format!("-F '{}={}'", key, text),
FormDataType::File(filepath) => format!("-F '{}=@{}'", key, filepath.display()),
})
.collect::<Vec<String>>()
.join(" ");
let body = match &operation.body {
Some(body) => format!("-d ' {}'", body),
None => "".to_string(),
};
curl.push_str(&format!(
"curl -X {} '{}{}' {} {} {}",
operation.method, operation.url, query_params, headers, form_data, body
));
curl
}
fn escape_multipart_quoted_string(value: &str) -> String {
value
.replace('\\', "\\\\")
.replace('"', "\\\"")
.replace('\r', "\\\r")
.replace('\n', "\\\n")
}
fn generate_multipart_boundary() -> String {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let counter = MULTIPART_BOUNDARY_COUNTER.fetch_add(1, Ordering::Relaxed) as u128;
format!(
"------------------------{:016x}{:016x}",
(timestamp >> 64) as u64,
(timestamp as u64) ^ counter as u64,
)
}
#[derive(Debug)]
struct InvalidRequestHeaderName {
header: String,
}
impl std::fmt::Display for InvalidRequestHeaderName {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Invalid header name '{}'", self.header)
}
}
impl std::error::Error for InvalidRequestHeaderName {}
#[derive(Debug)]
struct InvalidRequestHeaderValue {
header: String,
value: String,
}
impl std::fmt::Display for InvalidRequestHeaderValue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Invalid value '{}' for header '{}'", self.value, self.header)
}
}
impl std::error::Error for InvalidRequestHeaderValue {}
fn parse_header_name(header: &str) -> Result<HeaderName, Box<dyn std::error::Error>> {
header.parse::<HeaderName>().map_err(|_| {
Box::new(InvalidRequestHeaderName {
header: header.to_string(),
}) as Box<dyn std::error::Error>
})
}
fn parse_header_value(
value: String,
header: String,
) -> Result<HeaderValue, Box<dyn std::error::Error>> {
value.parse::<HeaderValue>().map_err(|_| {
Box::new(InvalidRequestHeaderValue { header, value }) as Box<dyn std::error::Error>
})
}