1use std::{net::SocketAddr, path::PathBuf, sync::Arc};
4
5use specmock_core::MockMode;
6use tokio::{sync::oneshot, task::JoinHandle};
7
8pub mod grpc;
9pub mod http;
10pub mod ws;
11
12const DEFAULT_MAX_BODY_SIZE: usize = 10 * 1024 * 1024;
14
15const DEFAULT_HTTP_ADDR: ([u8; 4], u16) = ([127, 0, 0, 1], 0);
17
18const DEFAULT_GRPC_ADDR: ([u8; 4], u16) = ([127, 0, 0, 1], 0);
20
21const DEFAULT_WS_PATH: &str = "/ws";
23
24const DEFAULT_SEED: u64 = 42;
26
27#[derive(Debug, Clone)]
29pub struct ServerConfig {
30 pub openapi_spec: Option<PathBuf>,
32 pub asyncapi_spec: Option<PathBuf>,
34 pub proto_spec: Option<PathBuf>,
36 pub mode: MockMode,
38 pub upstream: Option<String>,
40 pub seed: u64,
42 pub http_addr: SocketAddr,
44 pub grpc_addr: SocketAddr,
46 pub ws_path: String,
48 pub max_body_size: usize,
50}
51
52impl Default for ServerConfig {
53 fn default() -> Self {
54 Self {
55 openapi_spec: None,
56 asyncapi_spec: None,
57 proto_spec: None,
58 mode: MockMode::Mock,
59 upstream: None,
60 seed: DEFAULT_SEED,
61 http_addr: SocketAddr::from(DEFAULT_HTTP_ADDR),
62 grpc_addr: SocketAddr::from(DEFAULT_GRPC_ADDR),
63 ws_path: DEFAULT_WS_PATH.to_owned(),
64 max_body_size: DEFAULT_MAX_BODY_SIZE,
65 }
66 }
67}
68
69impl ServerConfig {
70 pub fn validate(&self) -> Result<(), RuntimeError> {
72 if self.openapi_spec.is_none() && self.asyncapi_spec.is_none() && self.proto_spec.is_none()
74 {
75 return Err(RuntimeError::Config(
76 "at least one spec must be provided: openapi_spec, asyncapi_spec, or proto_spec"
77 .to_owned(),
78 ));
79 }
80
81 if self.mode == MockMode::Proxy && self.upstream.is_none() {
83 return Err(RuntimeError::Config(
84 "proxy mode requires upstream base URL (--upstream)".to_owned(),
85 ));
86 }
87
88 if self.http_addr == self.grpc_addr &&
90 self.http_addr.port() != 0 &&
91 self.grpc_addr.port() != 0
92 {
93 return Err(RuntimeError::Config(
94 "HTTP and gRPC addresses must be different".to_owned(),
95 ));
96 }
97
98 if !self.ws_path.starts_with('/') {
100 return Err(RuntimeError::Config("WebSocket path must start with '/'".to_owned()));
101 }
102
103 if self.max_body_size == 0 {
105 return Err(RuntimeError::Config("max_body_size must be greater than 0".to_owned()));
106 }
107
108 if let Some(ref path) = self.openapi_spec &&
110 !path.exists()
111 {
112 return Err(RuntimeError::Config(format!(
113 "OpenAPI spec file does not exist: {}",
114 path.display()
115 )));
116 }
117
118 if let Some(ref path) = self.asyncapi_spec &&
119 !path.exists()
120 {
121 return Err(RuntimeError::Config(format!(
122 "AsyncAPI spec file does not exist: {}",
123 path.display()
124 )));
125 }
126
127 if let Some(ref path) = self.proto_spec &&
128 !path.exists()
129 {
130 return Err(RuntimeError::Config(format!(
131 "Protobuf spec file does not exist: {}",
132 path.display()
133 )));
134 }
135
136 Ok(())
137 }
138}
139
140#[derive(Debug)]
142pub struct RunningServer {
143 pub http_addr: SocketAddr,
145 pub grpc_addr: Option<SocketAddr>,
147 shutdown_tx: Option<oneshot::Sender<()>>,
148 tasks: Vec<JoinHandle<()>>,
149}
150
151impl RunningServer {
152 pub async fn shutdown(mut self) {
154 if let Some(shutdown_tx) = self.shutdown_tx.take() {
155 let _ignored = shutdown_tx.send(());
156 }
157 for task in self.tasks.drain(..) {
158 let _ignored = task.await;
159 }
160 }
161}
162
163impl Drop for RunningServer {
164 fn drop(&mut self) {
165 if let Some(shutdown_tx) = self.shutdown_tx.take() {
166 let _ignored = shutdown_tx.send(());
167 }
168 for task in &self.tasks {
169 task.abort();
170 }
171 }
172}
173
174#[derive(Debug, thiserror::Error)]
176pub enum RuntimeError {
177 #[error("invalid configuration: {0}")]
179 Config(String),
180 #[error("io error: {0}")]
182 Io(#[from] std::io::Error),
183 #[error("parse error: {0}")]
185 Parse(String),
186}
187
188pub async fn start(config: ServerConfig) -> Result<RunningServer, RuntimeError> {
190 config.validate()?;
191
192 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
193 let shared_shutdown = Arc::new(tokio::sync::Notify::new());
194
195 let http_runtime = http::HttpRuntime::from_config(&config).await?;
196 let (http_addr, http_task) =
197 http::spawn_http_server(http_runtime, config.http_addr, Arc::clone(&shared_shutdown))
198 .await?;
199
200 let mut tasks = vec![http_task];
201 let mut grpc_addr = None;
202
203 if config.proto_spec.is_some() {
204 let grpc_runtime = grpc::GrpcRuntime::from_config(&config).await?;
205 let (bound_grpc_addr, grpc_task) =
206 grpc::spawn_grpc_server(grpc_runtime, config.grpc_addr, Arc::clone(&shared_shutdown))
207 .await?;
208 grpc_addr = Some(bound_grpc_addr);
209 tasks.push(grpc_task);
210 }
211
212 let relay_notify = Arc::clone(&shared_shutdown);
214 tasks.push(tokio::spawn(async move {
215 let _ignored = shutdown_rx.await;
216 relay_notify.notify_waiters();
217 }));
218
219 Ok(RunningServer { http_addr, grpc_addr, shutdown_tx: Some(shutdown_tx), tasks })
220}