use std::collections::HashMap;
use std::process::ExitCode;
use std::sync::Arc;
use osproxy_config::{
AdminPassthroughConfig, Config, DiagBaseline, ObservabilityConfig, PassthroughConfig, TlsConfig,
};
use osproxy_core::{ClusterId, IndexName, SystemClock};
use osproxy_engine::{AdminPolicy, PassthroughPolicy, Pipeline};
use osproxy_observe::{DiagLevel, DirectiveStore};
use osproxy_otlp::OtlpHttpExporter;
use osproxy_server::auth::ReferenceAuthenticator;
use osproxy_server::cursor::HmacCursorSigner;
use osproxy_server::directive::HmacDirectiveVerifier;
use osproxy_server::handler::AppHandler;
use osproxy_server::log::{NoLog, RequestLog, StdoutDiagnosticSink, StdoutJsonLog};
use osproxy_server::tenancy::ReferenceTenancy;
use osproxy_sink::{OpenSearchSink, Reader, Sink};
use osproxy_tenancy::{Router, TenancyRouter};
use osproxy_transport::{DefaultCryptoProvider, IngressHandler};
use tokio::net::TcpListener;
mod capture;
mod directive_wiring;
use directive_wiring::{directive_store, with_directive_admin};
mod fanout;
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
#[tokio::main]
async fn main() -> ExitCode {
match run().await {
Ok(()) => ExitCode::SUCCESS,
Err(message) => {
eprintln!("osproxy: {message}");
ExitCode::FAILURE
}
}
}
#[allow(
clippy::too_many_lines,
reason = "the binary's top-level wiring: load config, build the sink/tenancy/\
pipeline/handler, then bind and serve. It is one sequential startup \
sequence; extracting fragments would only scatter the wiring it exists \
to centralize (docs/08 §3)."
)]
async fn run() -> Result<(), String> {
let cfg = Config::load(std::env::args().skip(1)).map_err(|e| e.to_string())?;
let cluster = ClusterId::from("default");
let sink = OpenSearchSink::new();
let tenancy = ReferenceTenancy::new(
cluster,
IndexName::from(cfg.index.as_str()),
cfg.upstream.clone(),
);
let (directive_read, directive_admin) = directive_store(&cfg).await?;
let pipeline = assemble_pipeline(tenancy, sink, directive_read, &cfg);
let pipeline = fanout::attach(pipeline, &cfg).await?;
let tokens: HashMap<String, String> = cfg.tokens.iter().cloned().collect();
let auth_mode = if tokens.is_empty() {
"dev (open)"
} else {
"token"
};
let app = AppHandler::new(pipeline, ReferenceAuthenticator::new(tokens))
.with_request_log(request_log(cfg.observability.log_requests))
.with_require_tls_for_mutation(cfg.require_tls_for_mutation)
.with_debug_endpoints(debug_endpoints(cfg.observability.debug_endpoints))
.with_forward_policy(forward_policy(&cfg));
let app = capture::attach(app, &cfg).await?;
let handler = Arc::new(with_directive_admin(
app,
directive_admin,
cfg.observability.directive_admin_token.as_deref(),
));
let listener = TcpListener::bind(cfg.bind)
.await
.map_err(|e| format!("binding {}: {e}", cfg.bind))?;
let provider = load_tls_provider(cfg.tls.as_ref())?.map(Arc::new);
if let Some(grpc_bind) = cfg.grpc_bind {
let grpc_listener = TcpListener::bind(grpc_bind)
.await
.map_err(|e| format!("binding gRPC {grpc_bind}: {e}"))?;
spawn_grpc(
grpc_listener,
provider.clone(),
Arc::clone(&handler),
&grpc_bind.to_string(),
);
}
let (bind, upstream, index) = (cfg.bind, &cfg.upstream, &cfg.index);
if let Some(provider) = provider {
println!(
"osproxy listening on https://{bind}, upstream {upstream}, shared index {index}, auth {auth_mode}"
);
osproxy_transport::serve_tls_with_shutdown(listener, provider, handler, shutdown_signal())
.await
.map_err(|e| format!("serving: {e}"))
} else {
println!(
"osproxy listening on http://{bind}, upstream {upstream}, shared index {index}, auth {auth_mode}"
);
osproxy_transport::serve_with_shutdown(listener, handler, shutdown_signal())
.await
.map_err(|e| format!("serving: {e}"))
}
}
fn spawn_grpc<H: IngressHandler>(
listener: TcpListener,
provider: Option<Arc<DefaultCryptoProvider>>,
handler: Arc<H>,
grpc_bind: &str,
) {
if let Some(provider) = provider {
println!("osproxy gRPC listening on grpcs://{grpc_bind}");
tokio::spawn(async move {
if let Err(e) = osproxy_transport::serve_grpc_tls(listener, provider, handler).await {
eprintln!("osproxy: gRPC serve error: {e}");
}
});
} else {
println!("osproxy gRPC listening on grpc://{grpc_bind}");
tokio::spawn(async move {
if let Err(e) = osproxy_transport::serve_grpc(listener, handler).await {
eprintln!("osproxy: gRPC serve error: {e}");
}
});
}
}
async fn shutdown_signal() {
let _ = tokio::signal::ctrl_c().await;
println!("osproxy: shutdown signal received, draining in-flight requests");
}
fn request_log(enabled: bool) -> Box<dyn RequestLog> {
if enabled {
println!("osproxy structured request logging: on (stdout JSON)");
Box::new(StdoutJsonLog)
} else {
Box::new(NoLog)
}
}
fn debug_endpoints(enabled: bool) -> bool {
if enabled {
println!(
"osproxy /debug endpoints: on (disable with OSPROXY_DEBUG_ENDPOINTS=false in prod)"
);
} else {
println!("osproxy /debug endpoints: off");
}
enabled
}
fn forward_policy(cfg: &Config) -> osproxy_server::forward_headers::ForwardPolicy {
osproxy_server::forward_headers::ForwardPolicy {
enabled: cfg.header_forwarding.enabled,
deny: cfg.header_forwarding.deny.clone(),
}
}
fn with_otlp_export<R: Router, S: Sink + Reader>(
pipeline: Pipeline<R, S>,
obs: &ObservabilityConfig,
) -> Pipeline<R, S> {
let Some(endpoint) = obs.otlp_endpoint.as_deref() else {
return pipeline;
};
let service = obs.service_name.clone();
println!("osproxy OTLP span export -> {endpoint}/v1/traces (service={service})");
pipeline
.with_exporter(Arc::new(OtlpHttpExporter::new(endpoint)))
.with_service_name(service)
}
fn with_diagnostic_capture_log<R: Router, S: Sink + Reader>(
pipeline: Pipeline<R, S>,
enabled: bool,
) -> Pipeline<R, S> {
if !enabled {
return pipeline;
}
println!("osproxy diagnostic captures -> stdout (kind=diagnostic_capture)");
pipeline.with_diagnostic_sink(Arc::new(StdoutDiagnosticSink))
}
fn with_diag_baseline<R: Router, S: Sink + Reader>(
pipeline: Pipeline<R, S>,
baseline: DiagBaseline,
) -> Pipeline<R, S> {
let level = match baseline {
DiagBaseline::Off => DiagLevel::Off,
DiagBaseline::Shape => DiagLevel::Shape,
DiagBaseline::ShapeTiming => DiagLevel::ShapeTiming,
DiagBaseline::ShapeRewriteDiff => DiagLevel::ShapeRewriteDiff,
};
println!("osproxy diagnostics baseline: {}", baseline.as_str());
pipeline.with_baseline_level(level)
}
fn with_debug_directive<R: Router, S: Sink + Reader>(
pipeline: Pipeline<R, S>,
key: Option<&str>,
) -> Pipeline<R, S> {
let Some(key) = key else {
return pipeline;
};
println!("osproxy X-Debug-Directive header channel: on (HMAC-verified)");
let verifier = HmacDirectiveVerifier::new(key.as_bytes(), Arc::new(SystemClock));
pipeline.with_directive_verifier(Arc::new(verifier))
}
fn assemble_pipeline(
tenancy: ReferenceTenancy,
sink: OpenSearchSink,
directive_store: Arc<dyn DirectiveStore>,
cfg: &Config,
) -> Pipeline<TenancyRouter<ReferenceTenancy>, OpenSearchSink> {
let base =
Pipeline::new(TenancyRouter::new(tenancy), sink).with_baseline_capture(cfg.capture_default);
let observed = with_debug_directive(
with_diag_baseline(
with_otlp_export(base, &cfg.observability),
cfg.observability.diag_baseline,
),
cfg.observability.debug_directive_key.as_deref(),
)
.with_directive_store(directive_store);
let observed = with_diagnostic_capture_log(observed, cfg.observability.log_diagnostic_captures);
let routed = with_admin_passthrough(
with_cursor_affinity(observed, cfg.cursor_affinity_key.as_deref()),
cfg.admin_passthrough.as_ref(),
);
with_passthrough(routed, cfg.passthrough.as_ref())
}
fn with_passthrough(
pipeline: Pipeline<TenancyRouter<ReferenceTenancy>, OpenSearchSink>,
passthrough: Option<&PassthroughConfig>,
) -> Pipeline<TenancyRouter<ReferenceTenancy>, OpenSearchSink> {
let Some(pt) = passthrough else {
return pipeline;
};
let scope = if pt.index_prefixes.is_empty() {
"all requests".to_owned()
} else {
format!("indices {:?}", pt.index_prefixes)
};
println!(
"osproxy passthrough: on (forwarding {scope} verbatim to cluster={} at {})",
pt.cluster, pt.endpoint,
);
pipeline.with_passthrough(
PassthroughPolicy::new(ClusterId::from(pt.cluster.as_str()), pt.endpoint.clone())
.with_index_prefixes(pt.index_prefixes.clone()),
)
}
fn with_admin_passthrough<R: Router, S: Sink + Reader>(
pipeline: Pipeline<R, S>,
admin: Option<&AdminPassthroughConfig>,
) -> Pipeline<R, S> {
let Some(admin) = admin else {
return pipeline;
};
println!(
"osproxy admin pass-through: on (cluster={}, prefixes={:?})",
admin.cluster, admin.prefixes
);
let mut policy = AdminPolicy::new(
ClusterId::from(admin.cluster.as_str()),
admin.prefixes.clone(),
);
if let Some(endpoint) = &admin.endpoint {
policy = policy.with_endpoint(endpoint.clone());
}
pipeline.with_admin_passthrough(policy)
}
fn with_cursor_affinity<R: Router, S: Sink + Reader>(
pipeline: Pipeline<R, S>,
key: Option<&str>,
) -> Pipeline<R, S> {
let Some(key) = key else {
return pipeline;
};
println!("osproxy scroll/PIT cursor affinity: on (HMAC-signed envelope)");
pipeline.with_cursor_signer(Arc::new(HmacCursorSigner::new(key.as_bytes())))
}
fn load_tls_provider(tls: Option<&TlsConfig>) -> Result<Option<DefaultCryptoProvider>, String> {
let Some(tls) = tls else {
return Ok(None);
};
let cert_pem =
std::fs::read(&tls.cert_path).map_err(|e| format!("reading {}: {e}", tls.cert_path))?;
let key_pem =
std::fs::read(&tls.key_path).map_err(|e| format!("reading {}: {e}", tls.key_path))?;
let provider = match &tls.client_ca_path {
Some(ca) => {
let ca_pem = std::fs::read(ca).map_err(|e| format!("reading {ca}: {e}"))?;
DefaultCryptoProvider::from_pem_mtls(&cert_pem, &key_pem, &ca_pem)
}
None => DefaultCryptoProvider::from_pem(&cert_pem, &key_pem),
}
.map_err(|e| format!("building TLS config: {e}"))?;
Ok(Some(provider))
}