#[cfg(feature = "http-client")]
use http::HeaderMap;
use crate::{
error::{Error, ErrorCode},
shared::MemChr,
types::Message,
};
use futures_util::TryFutureExt;
use std::{borrow::Cow, fmt::Display};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio_util::sync::CancellationToken;
#[cfg(feature = "http-server")]
use std::time::Duration;
use super::{Receiver as TransportReceiver, Sender as TransportSender, Transport};
#[cfg(all(feature = "http-client", feature = "client-tls"))]
use crate::transport::http::client::tls_config::{
ClientTlsConfig, TlsConfig as McpClientTlsConfig,
};
#[cfg(all(feature = "http-server-volga", feature = "server-tls"))]
pub use volga::tls::{DevCertMode, TlsConfig};
#[cfg(feature = "http-server-volga")]
pub use server::VolgaEngine;
#[cfg(feature = "http-server")]
pub use core::{
context::HttpContext,
engine::HttpEngine,
handlers,
types::{HttpRequest, HttpResponse, SseResponse},
};
#[cfg(feature = "http-client")]
pub(crate) mod client;
#[cfg(feature = "http-server")]
pub mod core;
#[cfg(feature = "http-server-volga")]
pub(crate) mod server;
#[cfg(feature = "http-client")]
pub(super) const MCP_SESSION_ID: &str = "Mcp-Session-Id";
const DEFAULT_ADDR: &str = "127.0.0.1:3000";
const DEFAULT_MCP_ENDPOINT: &str = "/mcp";
#[cfg(feature = "http-server")]
pub(crate) const DEFAULT_SSE_BUFFER_CAPACITY: usize = 64;
#[cfg(feature = "http-server")]
pub(crate) const DEFAULT_SSE_LIVE_QUEUE_CAPACITY: usize = 256;
#[cfg(feature = "http-server")]
pub(crate) const DEFAULT_SSE_LOG_QUEUE_CAPACITY: usize = 256;
#[cfg(feature = "http-server")]
pub(crate) const DEFAULT_SSE_CLEANUP_INTERVAL: Duration = Duration::from_secs(300);
#[cfg(feature = "http-server")]
pub(crate) const DEFAULT_SSE_SESSION_TTL: Duration = Duration::from_secs(1800);
#[inline]
#[cfg(feature = "http-client")]
pub(super) fn get_mcp_session_id(headers: &HeaderMap) -> Option<uuid::Uuid> {
headers
.get(MCP_SESSION_ID)
.and_then(|v| v.to_str().ok())
.and_then(|s| uuid::Uuid::parse_str(s).ok())
}
#[derive(Debug, Clone, Copy)]
pub(crate) enum HttpProto {
Http,
#[cfg(any(feature = "server-tls", feature = "client-tls"))]
Https,
}
#[cfg(feature = "http-server")]
pub struct HttpServer<C, E>
where
E: HttpEngine,
{
url: ServiceUrl,
engine: Option<E>,
sse_buffer_capacity: usize,
sse_live_queue_capacity: usize,
sse_log_queue_capacity: usize,
sse_cleanup_interval: Duration,
sse_session_ttl: Duration,
sender: HttpSender,
receiver: HttpReceiver,
_claims: std::marker::PhantomData<fn() -> C>,
}
#[cfg(feature = "http-client")]
pub struct HttpClient {
url: ServiceUrl,
access_token: Option<Box<[u8]>>,
#[cfg(feature = "client-tls")]
tls_config: Option<McpClientTlsConfig>,
sender: HttpSender,
receiver: HttpReceiver,
}
#[derive(Debug, Clone, Copy)]
pub(crate) struct ServiceUrl {
proto: HttpProto,
addr: &'static str,
endpoint: &'static str,
}
#[cfg(feature = "http-client")]
pub(super) struct ClientRuntimeContext {
url: ServiceUrl,
tx: Sender<Result<Message, Error>>,
rx: Receiver<Message>,
access_token: Option<Box<[u8]>>,
#[cfg(feature = "client-tls")]
tls_config: Option<ClientTlsConfig>,
}
pub(crate) struct HttpSender {
tx: Sender<Message>,
rx: Option<Receiver<Message>>,
}
pub(crate) struct HttpReceiver {
tx: Sender<Result<Message, Error>>,
rx: Receiver<Result<Message, Error>>,
}
#[cfg(feature = "http-server")]
impl<C, E> std::fmt::Debug for HttpServer<C, E>
where
E: HttpEngine + std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HttpServer")
.field("url", &self.url)
.field("engine", &self.engine)
.field("sse_buffer_capacity", &self.sse_buffer_capacity)
.field("sse_live_queue_capacity", &self.sse_live_queue_capacity)
.field("sse_log_queue_capacity", &self.sse_log_queue_capacity)
.field("sse_cleanup_interval", &self.sse_cleanup_interval)
.field("sse_session_ttl", &self.sse_session_ttl)
.finish()
}
}
#[cfg(feature = "http-server-volga")]
impl Default for HttpServer<server::DefaultClaims, server::VolgaEngine> {
#[inline]
fn default() -> Self {
Self {
url: ServiceUrl::default(),
engine: Some(server::VolgaEngine::default()),
sse_buffer_capacity: DEFAULT_SSE_BUFFER_CAPACITY,
sse_live_queue_capacity: DEFAULT_SSE_LIVE_QUEUE_CAPACITY,
sse_log_queue_capacity: DEFAULT_SSE_LOG_QUEUE_CAPACITY,
sse_cleanup_interval: DEFAULT_SSE_CLEANUP_INTERVAL,
sse_session_ttl: DEFAULT_SSE_SESSION_TTL,
receiver: HttpReceiver::new(),
sender: HttpSender::new(),
_claims: std::marker::PhantomData,
}
}
}
#[cfg(feature = "http-client")]
impl Default for HttpClient {
#[inline]
fn default() -> Self {
Self {
url: ServiceUrl::default(),
access_token: None,
#[cfg(feature = "client-tls")]
tls_config: None,
receiver: HttpReceiver::new(),
sender: HttpSender::new(),
}
}
}
#[cfg(feature = "http-client")]
impl std::fmt::Debug for HttpClient {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HttpClient")
.field("url", &self.url)
.finish()
}
}
impl ServiceUrl {
#[inline]
pub(crate) fn as_str<'a>(&self) -> Cow<'a, str> {
Cow::Owned(format!("{}://{}{}", self.proto, self.addr, self.endpoint))
}
}
impl Display for HttpProto {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self {
HttpProto::Http => f.write_str("http"),
#[cfg(any(feature = "server-tls", feature = "client-tls"))]
HttpProto::Https => f.write_str("https"),
}
}
}
impl Default for ServiceUrl {
#[inline]
fn default() -> Self {
Self {
proto: HttpProto::Http,
addr: DEFAULT_ADDR,
endpoint: DEFAULT_MCP_ENDPOINT,
}
}
}
impl Display for ServiceUrl {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.as_str().fmt(f)
}
}
impl From<&'static str> for ServiceUrl {
#[inline]
fn from(url: &'static str) -> Self {
let mut parts = MemChr::split(url, b'/');
Self {
proto: HttpProto::Http,
addr: parts.nth(0).unwrap_or(DEFAULT_ADDR),
endpoint: parts.nth(1).unwrap_or(DEFAULT_MCP_ENDPOINT),
}
}
}
impl Clone for HttpSender {
#[inline]
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
rx: None,
}
}
}
impl HttpSender {
pub(crate) fn new() -> Self {
let (tx, rx) = mpsc::channel(100);
Self { tx, rx: Some(rx) }
}
}
impl HttpReceiver {
pub(crate) fn new() -> Self {
let (tx, rx) = mpsc::channel(100);
Self { tx, rx }
}
}
#[cfg(feature = "http-server")]
impl<E> HttpServer<crate::auth::DefaultClaims, E>
where
E: HttpEngine,
{
pub fn from_engine(addr: &'static str, engine: E) -> Self {
let url = ServiceUrl {
addr,
..ServiceUrl::default()
};
Self {
url,
engine: Some(engine),
sse_buffer_capacity: DEFAULT_SSE_BUFFER_CAPACITY,
sse_live_queue_capacity: DEFAULT_SSE_LIVE_QUEUE_CAPACITY,
sse_log_queue_capacity: DEFAULT_SSE_LOG_QUEUE_CAPACITY,
sse_cleanup_interval: DEFAULT_SSE_CLEANUP_INTERVAL,
sse_session_ttl: DEFAULT_SSE_SESSION_TTL,
receiver: HttpReceiver::new(),
sender: HttpSender::new(),
_claims: std::marker::PhantomData,
}
}
}
#[cfg(feature = "http-server")]
impl<C, E> HttpServer<C, E>
where
E: HttpEngine,
{
pub fn bind(mut self, addr: &'static str) -> Self {
self.url.addr = addr;
self
}
pub fn with_endpoint(mut self, prefix: &'static str) -> Self {
self.url.endpoint = prefix;
self
}
pub fn with_engine<E2>(self, engine: E2) -> HttpServer<C, E2>
where
E2: HttpEngine,
{
HttpServer {
url: self.url,
engine: Some(engine),
sse_buffer_capacity: self.sse_buffer_capacity,
sse_live_queue_capacity: self.sse_live_queue_capacity,
sse_log_queue_capacity: self.sse_log_queue_capacity,
sse_cleanup_interval: self.sse_cleanup_interval,
sse_session_ttl: self.sse_session_ttl,
sender: self.sender,
receiver: self.receiver,
_claims: std::marker::PhantomData,
}
}
pub fn with_sse_buffer(mut self, capacity: usize) -> Self {
self.sse_buffer_capacity = capacity;
self
}
pub fn with_sse_live_queue(mut self, capacity: usize) -> Self {
assert!(
capacity > 0,
"SSE live queue capacity must be greater than 0"
);
self.sse_live_queue_capacity = capacity;
self
}
pub fn with_sse_log_queue(mut self, capacity: usize) -> Self {
assert!(
capacity > 0,
"SSE log queue capacity must be greater than 0"
);
self.sse_log_queue_capacity = capacity;
self
}
pub fn with_sse_cleanup_interval(mut self, interval: Duration) -> Self {
assert!(
!interval.is_zero(),
"SSE cleanup interval must be greater than 0"
);
self.sse_cleanup_interval = interval;
self
}
pub fn with_sse_session_ttl(mut self, ttl: Duration) -> Self {
assert!(!ttl.is_zero(), "SSE session TTL must be greater than 0");
self.sse_session_ttl = ttl;
self
}
fn build_context_and_engine(&mut self) -> Result<(HttpContext, Receiver<Message>), Error> {
let Some(sender_rx) = self.sender.rx.take() else {
return Err(Error::new(
ErrorCode::InternalError,
"The HTTP writer is already in use",
));
};
let pending = std::sync::Arc::new(dashmap::DashMap::new());
let sse_registry = std::sync::Arc::new(crate::shared::SseSessionRegistry::new(
self.sse_buffer_capacity,
));
let ctx = HttpContext {
addr: self.url.addr.into(),
endpoint: self.url.endpoint.into(),
pending,
sse_registry,
inbound_tx: self.receiver.tx.clone(),
sse_live_queue_capacity: self.sse_live_queue_capacity,
sse_log_queue_capacity: self.sse_log_queue_capacity,
};
Ok((ctx, sender_rx))
}
}
#[cfg(feature = "http-server-volga")]
impl HttpServer<server::DefaultClaims, VolgaEngine> {
pub fn new(addr: &'static str) -> Self {
let url = ServiceUrl {
addr,
..ServiceUrl::default()
};
Self {
url,
engine: Some(VolgaEngine::default()),
sse_buffer_capacity: DEFAULT_SSE_BUFFER_CAPACITY,
sse_live_queue_capacity: DEFAULT_SSE_LIVE_QUEUE_CAPACITY,
sse_log_queue_capacity: DEFAULT_SSE_LOG_QUEUE_CAPACITY,
sse_cleanup_interval: DEFAULT_SSE_CLEANUP_INTERVAL,
sse_session_ttl: DEFAULT_SSE_SESSION_TTL,
receiver: HttpReceiver::new(),
sender: HttpSender::new(),
_claims: std::marker::PhantomData,
}
}
pub fn with_auth<F>(mut self, config: F) -> Self
where
F: FnOnce(server::AuthConfig) -> server::AuthConfig,
{
let engine = self
.engine
.as_mut()
.expect("HttpServer::with_auth called after start()");
engine.auth = Some(config(server::AuthConfig::default()));
self
}
#[cfg(feature = "server-tls")]
pub fn with_tls<F>(mut self, config: F) -> Self
where
F: FnOnce(TlsConfig) -> TlsConfig,
{
let engine = self
.engine
.as_mut()
.expect("HttpServer::with_tls called after start()");
engine.tls = Some(config(Default::default()));
self.url.proto = HttpProto::Https;
self
}
}
#[cfg(feature = "http-client")]
impl HttpClient {
pub fn bind(mut self, addr: &'static str) -> Self {
self.url.addr = addr;
self
}
pub fn with_endpoint(mut self, prefix: &'static str) -> Self {
self.url.endpoint = prefix;
self
}
#[cfg(feature = "client-tls")]
pub fn with_tls<F>(mut self, config: F) -> Self
where
F: FnOnce(McpClientTlsConfig) -> McpClientTlsConfig,
{
self.tls_config = Some(config(Default::default()));
self.url.proto = HttpProto::Https;
self
}
pub fn with_auth(mut self, access_token: impl Into<String>) -> Self {
self.access_token = Some(access_token.into().into_bytes().into_boxed_slice());
self
}
fn runtime(&mut self) -> Result<ClientRuntimeContext, Error> {
let Some(sender_rx) = self.sender.rx.take() else {
return Err(Error::new(
ErrorCode::InternalError,
"The HTTP writer is already in use",
));
};
#[cfg(feature = "client-tls")]
let tls_config = self.tls_config.take().map(|tls| tls.build()).transpose()?;
Ok(ClientRuntimeContext {
url: self.url,
tx: self.receiver.tx.clone(),
rx: sender_rx,
access_token: self.access_token.take(),
#[cfg(feature = "client-tls")]
tls_config,
})
}
}
impl TransportSender for HttpSender {
async fn send(&mut self, msg: Message) -> Result<(), Error> {
self.tx
.send(msg)
.map_err(|err| Error::new(ErrorCode::InternalError, err))
.await
}
}
impl TransportReceiver for HttpReceiver {
async fn recv(&mut self) -> Result<Message, Error> {
self.rx.recv().await.unwrap_or_else(|| {
Err(Error::new(
ErrorCode::InvalidRequest,
"Unexpected end of stream",
))
})
}
}
#[cfg(feature = "http-server")]
impl<C, E> Transport for HttpServer<C, E>
where
C: Send + 'static,
E: HttpEngine,
{
type Sender = HttpSender;
type Receiver = HttpReceiver;
fn start(&mut self) -> CancellationToken {
let token = CancellationToken::new();
let (ctx, sender_rx) = match self.build_context_and_engine() {
Ok(x) => x,
Err(_err) => {
#[cfg(feature = "tracing")]
tracing::error!(logger = "neva", "Failed to start HTTP server: {}", _err);
return token;
}
};
let engine = self
.engine
.take()
.expect("HttpServer::start called twice or after engine was moved");
let pending = ctx.pending.clone();
let sse_registry = ctx.sse_registry.clone();
let cleanup_registry = ctx.sse_registry.clone();
let cleanup_interval = self.sse_cleanup_interval;
let session_ttl = self.sse_session_ttl;
let engine_token = token.clone();
tokio::spawn(async move {
tokio::join!(
core::dispatch::dispatch(pending, sse_registry, sender_rx, engine_token.clone(),),
core::cleanup::cleanup_stale_sessions(
cleanup_registry,
cleanup_interval,
session_ttl,
engine_token.clone(),
),
async {
if let Err(_e) = engine.run(ctx, engine_token.clone()).await {
#[cfg(feature = "tracing")]
tracing::error!(logger = "neva", "HTTP engine error: {:?}", _e);
engine_token.cancel();
}
}
);
});
token
}
#[inline]
fn split(self) -> (Self::Sender, Self::Receiver) {
(self.sender, self.receiver)
}
}
#[cfg(feature = "http-server")]
impl<C, E> core::engine::HttpTransport for HttpServer<C, E>
where
C: Send + 'static,
E: HttpEngine,
{
fn start(&mut self) -> CancellationToken {
<Self as Transport>::start(self)
}
fn split_into_proto(self: Box<Self>) -> (HttpSender, HttpReceiver) {
let s = *self;
Transport::split(s)
}
fn url_label(&self) -> String {
self.url.to_string()
}
}
#[cfg(feature = "http-client")]
impl Transport for HttpClient {
type Sender = HttpSender;
type Receiver = HttpReceiver;
fn start(&mut self) -> CancellationToken {
let token = CancellationToken::new();
let runtime = match self.runtime() {
Ok(runtime) => runtime,
Err(_err) => {
#[cfg(feature = "tracing")]
tracing::error!(logger = "neva", "Failed to start HTTP client: {}", _err);
return token;
}
};
tokio::spawn(client::connect(runtime, token.clone()));
token
}
fn split(self) -> (Self::Sender, Self::Receiver) {
(self.sender, self.receiver)
}
}
#[cfg(all(test, feature = "http-server"))]
mod engine_smoke_tests {
use super::*;
use crate::error::Error;
use crate::transport::Transport;
use crate::transport::http::core::{
context::HttpContext,
engine::HttpEngine,
types::{HttpRequest, HttpResponse},
};
use crate::types::Message;
use std::future::Future;
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
#[derive(Default)]
struct MockEngine {
started: Arc<AtomicBool>,
exited: Arc<AtomicBool>,
}
impl HttpEngine for MockEngine {
type Request = HttpRequest;
type Response = HttpResponse;
type SseEvent = ();
async fn adapt_request(req: Self::Request) -> Result<HttpRequest, Error> {
Ok(req)
}
fn adapt_response(resp: HttpResponse) -> Self::Response {
resp
}
fn tracked_event(_seq: u64, _msg: &Message) -> Self::SseEvent {}
fn ephemeral_event(_msg: &Message) -> Self::SseEvent {}
fn run(
self,
_ctx: HttpContext,
token: CancellationToken,
) -> impl Future<Output = Result<(), Error>> + Send {
let started = self.started;
let exited = self.exited;
async move {
started.store(true, Ordering::SeqCst);
token.cancelled().await;
exited.store(true, Ordering::SeqCst);
Ok(())
}
}
}
#[tokio::test(flavor = "multi_thread")]
async fn engine_run_is_invoked_and_cancellation_propagates() {
let started = Arc::new(AtomicBool::new(false));
let exited = Arc::new(AtomicBool::new(false));
let engine = MockEngine {
started: started.clone(),
exited: exited.clone(),
};
let mut server = HttpServer::from_engine("127.0.0.1:0", engine);
let token = <HttpServer<_, _> as Transport>::start(&mut server);
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(started.load(Ordering::SeqCst), "engine.run was not invoked");
token.cancel();
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(
exited.load(Ordering::SeqCst),
"engine did not exit on cancellation"
);
}
}