use std::{net::SocketAddr, path::PathBuf, sync::Arc};
use specmock_core::MockMode;
use tokio::{sync::oneshot, task::JoinHandle};
pub mod grpc;
pub mod http;
pub mod ws;
const DEFAULT_MAX_BODY_SIZE: usize = 10 * 1024 * 1024;
const DEFAULT_HTTP_ADDR: ([u8; 4], u16) = ([127, 0, 0, 1], 0);
const DEFAULT_GRPC_ADDR: ([u8; 4], u16) = ([127, 0, 0, 1], 0);
const DEFAULT_WS_PATH: &str = "/ws";
const DEFAULT_SEED: u64 = 42;
#[derive(Debug, Clone)]
pub struct ServerConfig {
pub openapi_spec: Option<PathBuf>,
pub asyncapi_spec: Option<PathBuf>,
pub proto_spec: Option<PathBuf>,
pub mode: MockMode,
pub upstream: Option<String>,
pub seed: u64,
pub http_addr: SocketAddr,
pub grpc_addr: SocketAddr,
pub ws_path: String,
pub max_body_size: usize,
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
openapi_spec: None,
asyncapi_spec: None,
proto_spec: None,
mode: MockMode::Mock,
upstream: None,
seed: DEFAULT_SEED,
http_addr: SocketAddr::from(DEFAULT_HTTP_ADDR),
grpc_addr: SocketAddr::from(DEFAULT_GRPC_ADDR),
ws_path: DEFAULT_WS_PATH.to_owned(),
max_body_size: DEFAULT_MAX_BODY_SIZE,
}
}
}
impl ServerConfig {
pub fn validate(&self) -> Result<(), RuntimeError> {
if self.openapi_spec.is_none() && self.asyncapi_spec.is_none() && self.proto_spec.is_none()
{
return Err(RuntimeError::Config(
"at least one spec must be provided: openapi_spec, asyncapi_spec, or proto_spec"
.to_owned(),
));
}
if self.mode == MockMode::Proxy && self.upstream.is_none() {
return Err(RuntimeError::Config(
"proxy mode requires upstream base URL (--upstream)".to_owned(),
));
}
if self.http_addr == self.grpc_addr &&
self.http_addr.port() != 0 &&
self.grpc_addr.port() != 0
{
return Err(RuntimeError::Config(
"HTTP and gRPC addresses must be different".to_owned(),
));
}
if !self.ws_path.starts_with('/') {
return Err(RuntimeError::Config("WebSocket path must start with '/'".to_owned()));
}
if self.max_body_size == 0 {
return Err(RuntimeError::Config("max_body_size must be greater than 0".to_owned()));
}
if let Some(ref path) = self.openapi_spec &&
!path.exists()
{
return Err(RuntimeError::Config(format!(
"OpenAPI spec file does not exist: {}",
path.display()
)));
}
if let Some(ref path) = self.asyncapi_spec &&
!path.exists()
{
return Err(RuntimeError::Config(format!(
"AsyncAPI spec file does not exist: {}",
path.display()
)));
}
if let Some(ref path) = self.proto_spec &&
!path.exists()
{
return Err(RuntimeError::Config(format!(
"Protobuf spec file does not exist: {}",
path.display()
)));
}
Ok(())
}
}
#[derive(Debug)]
pub struct RunningServer {
pub http_addr: SocketAddr,
pub grpc_addr: Option<SocketAddr>,
shutdown_tx: Option<oneshot::Sender<()>>,
tasks: Vec<JoinHandle<()>>,
}
impl RunningServer {
pub async fn shutdown(mut self) {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ignored = shutdown_tx.send(());
}
for task in self.tasks.drain(..) {
let _ignored = task.await;
}
}
}
impl Drop for RunningServer {
fn drop(&mut self) {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ignored = shutdown_tx.send(());
}
for task in &self.tasks {
task.abort();
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum RuntimeError {
#[error("invalid configuration: {0}")]
Config(String),
#[error("io error: {0}")]
Io(#[from] std::io::Error),
#[error("parse error: {0}")]
Parse(String),
}
pub async fn start(config: ServerConfig) -> Result<RunningServer, RuntimeError> {
config.validate()?;
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let shared_shutdown = Arc::new(tokio::sync::Notify::new());
let http_runtime = http::HttpRuntime::from_config(&config).await?;
let (http_addr, http_task) =
http::spawn_http_server(http_runtime, config.http_addr, Arc::clone(&shared_shutdown))
.await?;
let mut tasks = vec![http_task];
let mut grpc_addr = None;
if config.proto_spec.is_some() {
let grpc_runtime = grpc::GrpcRuntime::from_config(&config).await?;
let (bound_grpc_addr, grpc_task) =
grpc::spawn_grpc_server(grpc_runtime, config.grpc_addr, Arc::clone(&shared_shutdown))
.await?;
grpc_addr = Some(bound_grpc_addr);
tasks.push(grpc_task);
}
let relay_notify = Arc::clone(&shared_shutdown);
tasks.push(tokio::spawn(async move {
let _ignored = shutdown_rx.await;
relay_notify.notify_waiters();
}));
Ok(RunningServer { http_addr, grpc_addr, shutdown_tx: Some(shutdown_tx), tasks })
}