use crate::engine::MitmEngine;
use crate::observe::{Event, EventConsumer, FlowContext};
use crate::policy::PolicyEngine;
use crate::protocol::ApplicationProtocol;
use crate::tls::{CertificateAuthorityConfig, MitmCertificateStore, UpstreamTlsConfigCache};
use std::io;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
mod flow_hooks;
mod mitmproxy_tls_ops;
mod runtime_governor;
mod tls_diagnostics;
mod tls_learning;
pub use crate::config::H2ResponseOverflowMode;
#[cfg(feature = "__internal")]
pub use crate::types::FrameKind;
use event_emitters::unknown_context;
use event_emitters::{ingest_tls_learning_signal_with_audit, tls_error_to_io_invalid_input};
use flow_connect_tunnel::handle_client;
pub use flow_hooks::{FlowHooks, NoopFlowHooks, RawRequest, RawResponse, StreamChunk};
use flow_intercept::load_upstream_client_auth_pem;
use io_timeouts::install_io_timeout_config;
#[cfg(feature = "__internal")]
pub use mitmproxy_tls_ops::MitmproxyTlsHook;
pub use mitmproxy_tls_ops::{
adapt_mitmproxy_tls_callback, MitmproxyTlsAdapterEvent, MitmproxyTlsCallback,
};
pub use runtime_governor::{RuntimeBudgetConfig, RuntimeGovernor, RuntimeObservabilitySnapshot};
use socket_hardening::{
apply_per_connection_socket_hardening, bind_listener_with_socket_hardening,
is_benign_socket_close_error,
};
pub use tls_diagnostics::{TlsDiagnostics, TlsDiagnosticsSnapshot};
#[cfg(feature = "__internal")]
pub use tls_learning::TlsLearningDecision;
pub use tls_learning::{
TlsLearningGuardrails, TlsLearningOutcome, TlsLearningSignal, TlsLearningSnapshot,
};
use tls_profile_mapping::{
insert_tls_fingerprint_provenance, map_downstream_cert_profile, map_upstream_client_auth_mode,
map_upstream_sni_mode, resolve_effective_upstream_tls_profile,
};
pub(crate) const IO_CHUNK_SIZE: usize = 8 * 1024;
pub(crate) const CHUNK_LINE_LIMIT: usize = 8 * 1024;
pub(crate) const TLS_OPS_PROVIDER: &str = "rustls";
const DEFAULT_LISTENER_ACCEPT_RETRY_BACKOFF_MS: u64 = 100;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SidecarConfig {
pub listen_addr: String,
pub listen_port: u16,
pub max_connect_head_bytes: usize,
pub max_http_head_bytes: usize,
pub accept_retry_backoff_ms: u64,
pub idle_watchdog_timeout: Duration,
pub websocket_idle_watchdog_timeout: Duration,
pub upstream_connect_timeout: Duration,
pub stream_stage_timeout: Duration,
pub h2_body_idle_timeout: Duration,
pub h2_response_overflow_mode: H2ResponseOverflowMode,
pub unix_socket_path: Option<String>,
}
impl Default for SidecarConfig {
fn default() -> Self {
Self {
listen_addr: "127.0.0.1".to_string(),
listen_port: 8080,
max_connect_head_bytes: 64 * 1024,
max_http_head_bytes: 64 * 1024,
accept_retry_backoff_ms: DEFAULT_LISTENER_ACCEPT_RETRY_BACKOFF_MS,
idle_watchdog_timeout: Duration::from_secs(30),
websocket_idle_watchdog_timeout: Duration::from_secs(600),
upstream_connect_timeout: Duration::from_secs(10),
stream_stage_timeout: Duration::from_secs(30),
h2_body_idle_timeout: Duration::from_secs(120),
h2_response_overflow_mode: H2ResponseOverflowMode::TruncateContinue,
unix_socket_path: None,
}
}
}
pub struct SidecarServer<P, S>
where
P: PolicyEngine + Send + Sync + 'static,
S: EventConsumer + Send + Sync + 'static,
{
config: SidecarConfig,
engine: Arc<MitmEngine<P, S>>,
cert_store: Arc<MitmCertificateStore>,
runtime_governor: Arc<runtime_governor::RuntimeGovernor>,
tls_diagnostics: Arc<TlsDiagnostics>,
tls_learning: Arc<TlsLearningGuardrails>,
flow_hooks: Arc<dyn FlowHooks>,
upstream_tls_cache: Arc<UpstreamTlsConfigCache>,
}
#[derive(Clone)]
pub(crate) struct RuntimeHandles<P, S>
where
P: PolicyEngine + Send + Sync + 'static,
S: EventConsumer + Send + Sync + 'static,
{
pub(crate) engine: Arc<MitmEngine<P, S>>,
pub(crate) cert_store: Arc<MitmCertificateStore>,
pub(crate) runtime_governor: Arc<runtime_governor::RuntimeGovernor>,
pub(crate) tls_diagnostics: Arc<TlsDiagnostics>,
pub(crate) tls_learning: Arc<TlsLearningGuardrails>,
pub(crate) flow_hooks: Arc<dyn FlowHooks>,
pub(crate) upstream_tls_cache: Arc<UpstreamTlsConfigCache>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum HttpVersion {
Http10,
Http11,
}
impl HttpVersion {
fn as_str(self) -> &'static str {
match self {
Self::Http10 => "HTTP/1.0",
Self::Http11 => "HTTP/1.1",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum HttpBodyMode {
None,
ContentLength(u64),
Chunked,
CloseDelimited,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct HttpHeader {
pub(crate) name: String,
pub(crate) value: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct HttpRequestHead {
pub(crate) raw: Vec<u8>,
pub(crate) method: String,
pub(crate) target: String,
pub(crate) version: HttpVersion,
pub(crate) headers: Vec<HttpHeader>,
pub(crate) body_mode: HttpBodyMode,
pub(crate) connection_close: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct HttpResponseHead {
pub(crate) raw: Vec<u8>,
pub(crate) version: HttpVersion,
pub(crate) status_code: u16,
pub(crate) reason_phrase: String,
pub(crate) headers: Vec<HttpHeader>,
pub(crate) body_mode: HttpBodyMode,
pub(crate) connection_close: bool,
}
pub(crate) struct BufferedConn<S> {
pub(crate) stream: S,
pub(crate) read_buf: Vec<u8>,
}
impl<S> BufferedConn<S> {
fn new(stream: S) -> Self {
Self {
stream,
read_buf: Vec::new(),
}
}
}
impl<P, S> SidecarServer<P, S>
where
P: PolicyEngine + Send + Sync + 'static,
S: EventConsumer + Send + Sync + 'static,
{
pub fn new(config: SidecarConfig, engine: MitmEngine<P, S>) -> io::Result<Self> {
Self::new_with_flow_hooks(config, engine, Arc::new(NoopFlowHooks))
}
pub fn new_with_flow_hooks(
config: SidecarConfig,
engine: MitmEngine<P, S>,
flow_hooks: Arc<dyn FlowHooks>,
) -> io::Result<Self> {
let ca_config = CertificateAuthorityConfig {
ca_cert_pem_path: engine.config.ca_cert_pem_path.clone(),
ca_key_pem_path: engine.config.ca_key_pem_path.clone(),
ca_common_name: engine.config.ca_common_name.clone(),
ca_organization: engine.config.ca_organization.clone(),
leaf_cert_cache_capacity: engine.config.leaf_cert_cache_capacity,
ca_rotate_after_seconds: engine.config.ca_rotate_after_seconds,
downstream_cert_profile: map_downstream_cert_profile(
engine.config.downstream_cert_profile,
),
};
let cert_store =
MitmCertificateStore::new(ca_config).map_err(tls_error_to_io_invalid_input)?;
let runtime_governor = Arc::new(runtime_governor::RuntimeGovernor::new(
RuntimeBudgetConfig {
max_concurrent_flows: engine.config.max_concurrent_flows,
max_in_flight_bytes: engine.config.max_in_flight_bytes,
},
));
runtime_governor::install_global_runtime_governor(Arc::clone(&runtime_governor));
runtime_governor::set_event_queue_depth_global(0);
install_io_timeout_config(
config.idle_watchdog_timeout,
config.websocket_idle_watchdog_timeout,
config.upstream_connect_timeout,
config.stream_stage_timeout,
config.h2_body_idle_timeout,
config.h2_response_overflow_mode,
);
tracing::info!(
stream_stage_timeout_ms = config.stream_stage_timeout.as_millis() as u64,
h2_body_idle_timeout_ms = config.h2_body_idle_timeout.as_millis() as u64,
h2_response_overflow_mode = ?config.h2_response_overflow_mode,
"installed sidecar IO timeout config"
);
let tls_diagnostics = Arc::new(TlsDiagnostics::default());
let tls_learning = Arc::new(TlsLearningGuardrails::new());
let upstream_tls_profile = resolve_effective_upstream_tls_profile(
engine.config.tls_profile,
engine.config.tls_fingerprint_mode,
engine.config.tls_fingerprint_class,
);
let upstream_sni_mode = map_upstream_sni_mode(engine.config.upstream_sni_mode);
let upstream_client_auth_mode =
map_upstream_client_auth_mode(engine.config.upstream_client_auth_mode);
let upstream_client_auth_pem = load_upstream_client_auth_pem(
&engine.config.upstream_client_cert_pem_path,
&engine.config.upstream_client_key_pem_path,
upstream_client_auth_mode,
)
.map_err(|error| {
io::Error::new(
io::ErrorKind::InvalidInput,
format!("upstream client auth material load failed: {error}"),
)
})?;
let upstream_tls_cache = Arc::new(UpstreamTlsConfigCache::new(
upstream_tls_profile,
upstream_sni_mode,
upstream_client_auth_mode,
upstream_client_auth_pem,
));
Ok(Self {
config,
engine: Arc::new(engine),
cert_store: Arc::new(cert_store),
runtime_governor,
tls_diagnostics,
tls_learning,
flow_hooks,
upstream_tls_cache,
})
}
pub fn tls_diagnostics_snapshot(&self) -> TlsDiagnosticsSnapshot {
self.tls_diagnostics.snapshot()
}
pub fn tls_diagnostics_handle(&self) -> Arc<TlsDiagnostics> {
Arc::clone(&self.tls_diagnostics)
}
pub fn tls_learning_snapshot(&self) -> TlsLearningSnapshot {
self.tls_learning.snapshot()
}
pub fn tls_learning_handle(&self) -> Arc<TlsLearningGuardrails> {
Arc::clone(&self.tls_learning)
}
pub fn runtime_observability_snapshot(&self) -> RuntimeObservabilitySnapshot {
self.runtime_governor.snapshot()
}
pub fn runtime_observability_handle(&self) -> Arc<RuntimeGovernor> {
Arc::clone(&self.runtime_governor)
}
pub fn ingest_tls_learning_signal(&self, signal: TlsLearningSignal) -> TlsLearningOutcome {
let context = FlowContext {
flow_id: self.engine.allocate_flow_id(),
client_addr: "<tls-learning>".to_string(),
server_host: signal.host.clone(),
server_port: 0,
protocol: ApplicationProtocol::Tunnel,
};
ingest_tls_learning_signal_with_audit(&self.engine, &self.tls_learning, context, signal)
}
pub fn ingest_mitmproxy_tls_callback(
&self,
callback: MitmproxyTlsCallback,
) -> MitmproxyTlsAdapterEvent {
let mut adapted = adapt_mitmproxy_tls_callback(&callback);
insert_tls_fingerprint_provenance(
&mut adapted.attributes,
self.engine.config.tls_fingerprint_mode,
self.engine.config.tls_fingerprint_class,
);
if let Some(failure) = adapted.failure.as_ref() {
let counters = self.tls_diagnostics.record_failure(
&adapted.context.server_host,
&failure.source,
&failure.reason,
);
adapted.attributes.insert(
"tls_failure_host_count".to_string(),
counters.host_total_failures.to_string(),
);
adapted.attributes.insert(
"tls_failure_host_rolling_count".to_string(),
counters.host_rolling_failures.to_string(),
);
adapted.attributes.insert(
"tls_failure_source_count".to_string(),
counters.source_total_failures.to_string(),
);
adapted.attributes.insert(
"tls_failure_reason_count".to_string(),
counters.reason_total_failures.to_string(),
);
adapted.attributes.insert(
"tls_failure_global_count".to_string(),
counters.global_total_failures.to_string(),
);
let learning_signal = TlsLearningSignal::new(
adapted.context.server_host.clone(),
failure.reason.clone(),
failure.source.clone(),
adapted
.attributes
.get("tls_ops_provider")
.cloned()
.unwrap_or_else(|| "mitmproxy".to_string()),
false,
);
let learning_outcome = ingest_tls_learning_signal_with_audit(
&self.engine,
&self.tls_learning,
adapted.context.clone(),
learning_signal,
);
adapted.attributes.insert(
"tls_learning_decision".to_string(),
learning_outcome.decision.as_str().to_string(),
);
adapted.attributes.insert(
"tls_learning_reason_code".to_string(),
learning_outcome.reason_code.to_string(),
);
adapted.attributes.insert(
"tls_learning_host_count".to_string(),
learning_outcome.host_applied_total.to_string(),
);
adapted.attributes.insert(
"tls_learning_global_applied".to_string(),
learning_outcome.global_applied_total.to_string(),
);
adapted.attributes.insert(
"tls_learning_global_ignored".to_string(),
learning_outcome.global_ignored_total.to_string(),
);
}
let mut event = Event::new(adapted.kind, adapted.context.clone());
event.attributes = adapted.attributes.clone();
self.engine.emit_event(event);
adapted
}
pub async fn run(self) -> io::Result<()> {
let listener = self.bind_listener().await?;
#[cfg(unix)]
{
self.run_with_optional_unix_listener(listener).await
}
#[cfg(not(unix))]
{
self.run_with_listener(listener).await
}
}
pub async fn bind_listener(&self) -> io::Result<TcpListener> {
bind_listener_with_socket_hardening(&self.config).await
}
pub async fn run_with_listener(self, listener: TcpListener) -> io::Result<()> {
let accept_retry_backoff_ms = self.config.accept_retry_backoff_ms.max(1);
loop {
let (stream, client_addr) = match listener.accept().await {
Ok(parts) => parts,
Err(error) if is_transient_listener_accept_error(&error) => {
tracing::warn!(
error = %error,
kind = ?error.kind(),
backoff_ms = accept_retry_backoff_ms,
"transient listener accept error; retrying"
);
tokio::time::sleep(Duration::from_millis(accept_retry_backoff_ms)).await;
continue;
}
Err(error) => return Err(error),
};
apply_per_connection_socket_hardening(&stream);
let runtime = RuntimeHandles {
engine: Arc::clone(&self.engine),
cert_store: Arc::clone(&self.cert_store),
runtime_governor: Arc::clone(&self.runtime_governor),
tls_diagnostics: Arc::clone(&self.tls_diagnostics),
tls_learning: Arc::clone(&self.tls_learning),
flow_hooks: Arc::clone(&self.flow_hooks),
upstream_tls_cache: Arc::clone(&self.upstream_tls_cache),
};
let max_connect_head_bytes = self.config.max_connect_head_bytes;
let max_http_head_bytes = self.config.max_http_head_bytes;
let client_addr = client_addr.to_string();
let flow_hooks_for_register = Arc::clone(&self.flow_hooks);
let flow_id_pre = self.engine.allocate_flow_id();
let join_handle = tokio::spawn(async move {
let accept_context = unknown_context(flow_id_pre, client_addr.clone());
let process_info = runtime
.flow_hooks
.resolve_process_info(accept_context.clone())
.await;
runtime
.flow_hooks
.on_connection_open(accept_context, process_info.clone())
.await;
if let Err(error) = handle_client(
runtime,
stream,
client_addr,
flow_id_pre,
process_info,
max_connect_head_bytes,
max_http_head_bytes,
&mut None,
)
.await
{
if !is_benign_socket_close_error(&error) {
tracing::warn!(error = %error, "connection handling failed");
}
}
});
flow_hooks_for_register
.register_task_abort_handle(flow_id_pre, join_handle.abort_handle());
}
}
}
fn is_transient_listener_accept_error(error: &io::Error) -> bool {
match error.kind() {
io::ErrorKind::Interrupted
| io::ErrorKind::WouldBlock
| io::ErrorKind::ConnectionAborted => true,
_ => {
#[cfg(unix)]
{
match error.raw_os_error() {
Some(24) | Some(23) => true,
_ => false,
}
}
#[cfg(not(unix))]
{
false
}
}
}
}
#[cfg(test)]
mod listener_accept_error_tests {
use super::is_transient_listener_accept_error;
use std::io;
#[test]
fn interrupted_accept_error_is_transient() {
let err = io::Error::new(io::ErrorKind::Interrupted, "interrupted");
assert!(is_transient_listener_accept_error(&err));
}
#[cfg(unix)]
#[test]
fn emfile_accept_error_is_transient() {
let err = io::Error::from_raw_os_error(24);
assert!(is_transient_listener_accept_error(&err));
}
}
mod close_codes;
mod http_body_relay;
mod http_head_parser;
mod http_head_parser_api;
mod http_head_parser_smuggling;
pub(crate) mod clienthello_parser;
mod downstream_tls;
mod tls_profile_mapping;
mod tls_revocation_metadata;
mod io_timeouts;
mod shutdown_control;
mod socket_hardening;
mod route_planner;
mod route_planner_model;
mod route_planner_transport;
mod flow_policy_snapshot;
mod event_emitters;
mod event_emitters_protocol;
mod grpc_stream_observer;
mod ndjson_stream_observer;
mod sse_stream_observer;
mod websocket_codec;
mod websocket_events;
mod websocket_handshake_validation;
mod websocket_relay;
mod websocket_relay_io;
mod websocket_relay_support;
mod websocket_turn_tracker;
mod http2_relay_support;
mod http2_stream_hook_dispatch;
mod http2_stream_relay;
mod http2_stream_relay_body;
mod http2_stream_relay_http1;
mod http2_stream_relay_http1_body;
mod http2_stream_relay_http1_response_relay;
mod http2_stream_relay_http1_stream;
mod http2_stream_relay_stream;
mod http2_stream_response_relay;
mod flow_connect_tunnel;
mod flow_connect_tunnel_support;
mod flow_forward_proxy_http1;
mod flow_forward_proxy_http1_helpers;
mod flow_hook_http_helpers;
mod flow_intercept;
mod flow_intercept_http1;
mod flow_intercept_http1_response;
mod flow_intercept_tls_failure;
#[cfg(unix)]
mod local_capture_unix;
#[cfg(feature = "__internal")]
pub use http_head_parser_api::{parse_http1_request_head_bytes, parse_http1_response_head_bytes};