Skip to main content

specmock_runtime/
lib.rs

1//! Runtime servers for HTTP(OpenAPI), WS(AsyncAPI), and gRPC(Protobuf).
2
3use std::{net::SocketAddr, path::PathBuf, sync::Arc};
4
5use specmock_core::MockMode;
6use tokio::{sync::oneshot, task::JoinHandle};
7
8pub mod grpc;
9pub mod http;
10pub mod ws;
11
12/// Default maximum request body size: 10 MiB.
13const DEFAULT_MAX_BODY_SIZE: usize = 10 * 1024 * 1024;
14
15/// Default HTTP listen address.
16const DEFAULT_HTTP_ADDR: ([u8; 4], u16) = ([127, 0, 0, 1], 0);
17
18/// Default gRPC listen address.
19const DEFAULT_GRPC_ADDR: ([u8; 4], u16) = ([127, 0, 0, 1], 0);
20
21/// Default WebSocket path.
22const DEFAULT_WS_PATH: &str = "/ws";
23
24/// Default deterministic seed.
25const DEFAULT_SEED: u64 = 42;
26
27/// Runtime configuration.
28#[derive(Debug, Clone)]
29pub struct ServerConfig {
30    /// OpenAPI spec file.
31    pub openapi_spec: Option<PathBuf>,
32    /// AsyncAPI spec file.
33    pub asyncapi_spec: Option<PathBuf>,
34    /// Protobuf root file.
35    pub proto_spec: Option<PathBuf>,
36    /// Runtime mode.
37    pub mode: MockMode,
38    /// Upstream base URL for proxy mode.
39    pub upstream: Option<String>,
40    /// Deterministic seed used by faker.
41    pub seed: u64,
42    /// HTTP/WS listen address.
43    pub http_addr: SocketAddr,
44    /// gRPC listen address.
45    pub grpc_addr: SocketAddr,
46    /// WebSocket path.
47    pub ws_path: String,
48    /// Maximum request body size in bytes (default 10 MiB).
49    pub max_body_size: usize,
50}
51
52impl Default for ServerConfig {
53    fn default() -> Self {
54        Self {
55            openapi_spec: None,
56            asyncapi_spec: None,
57            proto_spec: None,
58            mode: MockMode::Mock,
59            upstream: None,
60            seed: DEFAULT_SEED,
61            http_addr: SocketAddr::from(DEFAULT_HTTP_ADDR),
62            grpc_addr: SocketAddr::from(DEFAULT_GRPC_ADDR),
63            ws_path: DEFAULT_WS_PATH.to_owned(),
64            max_body_size: DEFAULT_MAX_BODY_SIZE,
65        }
66    }
67}
68
69impl ServerConfig {
70    /// Validate the configuration.
71    pub fn validate(&self) -> Result<(), RuntimeError> {
72        // Check that at least one spec is provided
73        if self.openapi_spec.is_none() && self.asyncapi_spec.is_none() && self.proto_spec.is_none()
74        {
75            return Err(RuntimeError::Config(
76                "at least one spec must be provided: openapi_spec, asyncapi_spec, or proto_spec"
77                    .to_owned(),
78            ));
79        }
80
81        // Check proxy mode configuration
82        if self.mode == MockMode::Proxy && self.upstream.is_none() {
83            return Err(RuntimeError::Config(
84                "proxy mode requires upstream base URL (--upstream)".to_owned(),
85            ));
86        }
87
88        // Check that HTTP and gRPC addresses are not the same
89        if self.http_addr == self.grpc_addr &&
90            self.http_addr.port() != 0 &&
91            self.grpc_addr.port() != 0
92        {
93            return Err(RuntimeError::Config(
94                "HTTP and gRPC addresses must be different".to_owned(),
95            ));
96        }
97
98        // Check that WebSocket path starts with /
99        if !self.ws_path.starts_with('/') {
100            return Err(RuntimeError::Config("WebSocket path must start with '/'".to_owned()));
101        }
102
103        // Check that max_body_size is reasonable
104        if self.max_body_size == 0 {
105            return Err(RuntimeError::Config("max_body_size must be greater than 0".to_owned()));
106        }
107
108        // Check that spec files exist
109        if let Some(ref path) = self.openapi_spec &&
110            !path.exists()
111        {
112            return Err(RuntimeError::Config(format!(
113                "OpenAPI spec file does not exist: {}",
114                path.display()
115            )));
116        }
117
118        if let Some(ref path) = self.asyncapi_spec &&
119            !path.exists()
120        {
121            return Err(RuntimeError::Config(format!(
122                "AsyncAPI spec file does not exist: {}",
123                path.display()
124            )));
125        }
126
127        if let Some(ref path) = self.proto_spec &&
128            !path.exists()
129        {
130            return Err(RuntimeError::Config(format!(
131                "Protobuf spec file does not exist: {}",
132                path.display()
133            )));
134        }
135
136        Ok(())
137    }
138}
139
140/// Runtime handle.
141#[derive(Debug)]
142pub struct RunningServer {
143    /// Bound HTTP address.
144    pub http_addr: SocketAddr,
145    /// Bound gRPC address, if proto runtime is active.
146    pub grpc_addr: Option<SocketAddr>,
147    shutdown_tx: Option<oneshot::Sender<()>>,
148    tasks: Vec<JoinHandle<()>>,
149}
150
151impl RunningServer {
152    /// Shut down runtime tasks gracefully.
153    pub async fn shutdown(mut self) {
154        if let Some(shutdown_tx) = self.shutdown_tx.take() {
155            let _ignored = shutdown_tx.send(());
156        }
157        for task in self.tasks.drain(..) {
158            let _ignored = task.await;
159        }
160    }
161}
162
163impl Drop for RunningServer {
164    fn drop(&mut self) {
165        if let Some(shutdown_tx) = self.shutdown_tx.take() {
166            let _ignored = shutdown_tx.send(());
167        }
168        for task in &self.tasks {
169            task.abort();
170        }
171    }
172}
173
174/// Runtime error.
175#[derive(Debug, thiserror::Error)]
176pub enum RuntimeError {
177    /// Invalid configuration.
178    #[error("invalid configuration: {0}")]
179    Config(String),
180    /// IO error.
181    #[error("io error: {0}")]
182    Io(#[from] std::io::Error),
183    /// Serialization / parsing error.
184    #[error("parse error: {0}")]
185    Parse(String),
186}
187
188/// Start protocol runtimes.
189pub async fn start(config: ServerConfig) -> Result<RunningServer, RuntimeError> {
190    config.validate()?;
191
192    let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
193    let shared_shutdown = Arc::new(tokio::sync::Notify::new());
194
195    let http_runtime = http::HttpRuntime::from_config(&config).await?;
196    let (http_addr, http_task) =
197        http::spawn_http_server(http_runtime, config.http_addr, Arc::clone(&shared_shutdown))
198            .await?;
199
200    let mut tasks = vec![http_task];
201    let mut grpc_addr = None;
202
203    if config.proto_spec.is_some() {
204        let grpc_runtime = grpc::GrpcRuntime::from_config(&config).await?;
205        let (bound_grpc_addr, grpc_task) =
206            grpc::spawn_grpc_server(grpc_runtime, config.grpc_addr, Arc::clone(&shared_shutdown))
207                .await?;
208        grpc_addr = Some(bound_grpc_addr);
209        tasks.push(grpc_task);
210    }
211
212    // Relay oneshot shutdown to notify-based shutdown for all tasks.
213    let relay_notify = Arc::clone(&shared_shutdown);
214    tasks.push(tokio::spawn(async move {
215        let _ignored = shutdown_rx.await;
216        relay_notify.notify_waiters();
217    }));
218
219    Ok(RunningServer { http_addr, grpc_addr, shutdown_tx: Some(shutdown_tx), tasks })
220}