1#![warn(missing_docs)]
86
87use std::env;
88use std::process::ExitCode;
89use std::str::FromStr;
90use std::path::PathBuf;
91use std::sync::{Arc, Mutex};
92use std::time::Duration;
93use std::sync::mpsc::channel;
94
95use clap::{Command, Arg, ArgMatches, ArgAction, command, crate_version};
96use clap::error::ErrorKind;
97use mimalloc::MiMalloc;
98use pact_models::prelude::*;
99use pact_models::prelude::v4::*;
100use regex::Regex;
101use tracing::{debug, error, info, warn};
102use tracing_core::LevelFilter;
103use tracing_subscriber::FmtSubscriber;
104use tokio::sync::broadcast;
105use notify::RecursiveMode;
106use notify_debouncer_mini::{DebouncedEventKind, new_debouncer};
107use crate::loading::load_pacts;
108
109use crate::server::ServerHandler;
110
111fn setup_file_watcher(
113 sources: Vec<PactSource>,
114 matches: &ArgMatches,
115 shared_pacts: Arc<Mutex<Vec<(V4Pact, PactSource)>>>,
116 reload_tx: broadcast::Sender<()>
117) {
118 let watch_paths = get_watch_paths(&sources);
119 if watch_paths.is_empty() {
120 warn!("No file or directory sources found for watching");
121 return;
122 }
123
124 let insecure_tls = matches.get_flag("insecure-tls");
125 let ext = matches.get_one::<String>("ext").cloned();
126
127 std::thread::spawn(move || {
128 let (debounce_tx, debounce_rx) = channel();
129 let mut debouncer = match new_debouncer(Duration::from_secs(1), debounce_tx) {
130 Ok(debouncer) => debouncer,
131 Err(e) => {
132 error!("Failed to create file debouncer: {}", e);
133 return;
134 }
135 };
136
137 for path in &watch_paths {
139 if let Err(e) = debouncer.watcher().watch(path, RecursiveMode::Recursive) {
140 error!("Failed to watch path {:?}: {}", path, e);
141 } else {
142 info!("Watching for changes in: {:?}", path);
143 }
144 }
145
146 let runtime = tokio::runtime::Runtime::new().unwrap();
147
148 loop {
149 match debounce_rx.recv() {
150 Ok(Ok(events)) => {
151 for event in events.iter() {
152 match &event.kind {
153 DebouncedEventKind::Any => {
154 info!("File change detected in watched directory");
155
156 let pacts_result = runtime.block_on(load_pacts(sources.clone(), insecure_tls, ext.as_ref()));
158 if pacts_result.iter().any(|p| p.is_err()) {
159 error!("Error reloading pacts:");
160 for error in pacts_result.iter().filter_map(|p| p.as_ref().err()) {
161 error!(" - {}", error);
162 }
163 } else {
164 let new_pacts = pacts_result.iter()
165 .filter_map(|result| result.as_ref().ok())
166 .map(|(p, s)| (p.as_v4_pact().unwrap(), s.clone()))
167 .collect::<Vec<_>>();
168
169 let interactions: usize = new_pacts.iter().map(|(p, _)| p.interactions.len()).sum();
170 info!("Reloaded {} pacts ({} total interactions)", new_pacts.len(), interactions);
171
172 *shared_pacts.lock().unwrap() = new_pacts;
173 let _ = reload_tx.send(());
174 }
175 break;
176 }
177 _ => {}
178 }
179 }
180 }
181 Ok(Err(e)) => {
182 error!("Watch error: {:?}", e);
183 break;
184 }
185 Err(e) => {
186 error!("Debouncer channel error: {:?}", e);
187 break;
188 }
189 }
190 }
191 });
192}
193
194fn get_watch_paths(sources: &[PactSource]) -> Vec<PathBuf> {
196 sources.iter()
197 .filter_map(|source| match source {
198 PactSource::File(path) => Some(PathBuf::from(path)),
199 PactSource::Dir(path) => Some(PathBuf::from(path)),
200 _ => None, })
202 .collect()
203}
204
205mod pact_support;
206mod server;
207mod loading;
208
209#[global_allocator]
210static GLOBAL: MiMalloc = MiMalloc;
211
212
213pub fn print_version() {
214 println!("pact stub server version : v{}", env!("CARGO_PKG_VERSION"));
215 println!("pact specification version: v{}", PactSpecification::V4.version_str());
216}
217
218fn integer_value(v: &str) -> Result<u16, String> {
219 v.parse::<u16>().map_err(|e| format!("'{}' is not a valid port value: {}", v, e) )
220}
221
222fn regex_value(v: &str) -> Result<Regex, String> {
223 if v.is_empty() {
224 Err("Regular expression is empty".to_string())
225 } else {
226 Regex::new(v).map_err(|e| format!("'{}' is not a valid regular expression: {}", v, e))
227 }
228}
229
230#[derive(Debug, Clone)]
232pub enum PactSource {
233 File(String),
235 Dir(String),
237 URL(String, Option<HttpAuth>),
239 Broker {
241 url: String,
243 auth: Option<HttpAuth>,
245 consumers: Vec<Regex>,
247 providers: Vec<Regex>
249 },
250 Unknown
252}
253
254fn pact_source(matches: &ArgMatches) -> Vec<PactSource> {
255 let mut sources = vec![];
256
257 if let Some(values) = matches.get_many::<String>("file") {
258 sources.extend(values.map(|v| PactSource::File(v.clone())).collect::<Vec<PactSource>>());
259 }
260
261 if let Some(values) = matches.get_many::<String>("dir") {
262 sources.extend(values.map(|v| PactSource::Dir(v.clone())).collect::<Vec<PactSource>>());
263 }
264
265 if let Some(values) = matches.get_many::<String>("url") {
266 sources.extend(values.map(|v| {
267 let auth = matches.get_one::<String>("user")
268 .map(|u| {
269 let mut auth = u.split(':');
270 HttpAuth::User(auth.next().unwrap().to_string(), auth.next().map(|p| p.to_string()))
271 })
272 .or_else(|| matches.get_one::<String>("token").map(|v| HttpAuth::Token(v.clone())));
273 PactSource::URL(v.clone(), auth)
274 }).collect::<Vec<PactSource>>());
275 }
276
277 if let Some(url) = matches.get_one::<String>("broker-url") {
278 let auth = matches.get_one::<String>("user")
279 .map(|u| {
280 let mut auth = u.split(':');
281 HttpAuth::User(auth.next().unwrap().to_string(), auth.next().map(|p| p.to_string()))
282 })
283 .or_else(|| matches.get_one::<String>("token").map(|v| HttpAuth::Token(v.clone())));
284 debug!("Loading all pacts from Pact Broker at {} using {} authentication", url,
285 auth.clone().map(|auth| auth.to_string()).unwrap_or_else(|| "no".to_string()));
286 sources.push(PactSource::Broker {
287 url: url.to_string(),
288 auth,
289 consumers: matches.get_many::<Regex>("consumer-name").unwrap_or_default().into_iter().cloned().collect(),
290 providers: matches.get_many::<Regex>("provider-name").unwrap_or_default().into_iter().cloned().collect()
291 });
292 }
293
294 sources
295}
296
297pub async fn handle_command_args(args: Vec<String>) -> Result<(), ExitCode> {
301 let app = build_args();
302 match app.try_get_matches_from(args) {
303 Ok(results) => handle_matches(&results).await,
304
305 Err(ref err) => match err.kind() {
306 ErrorKind::DisplayHelp => {
307 println!("{}", err);
308 Ok(())
309 }
310 ErrorKind::DisplayVersion => {
311 print_version();
312 println!();
313 Ok(())
314 }
315 _ => err.exit(),
316 },
317 }
318}
319
320pub fn process_stub_command(args: &ArgMatches) -> Result<(), ExitCode> {
325 tokio::runtime::Runtime::new().unwrap().block_on(async {
326 let res = handle_matches(args).await;
327 match res {
328 Ok(()) => Ok(()),
329 Err(code) => Err(code),
330 }
331 })
332}
333
334async fn handle_matches(matches: &ArgMatches) -> Result<(), ExitCode> {
335 let level = matches.get_one::<String>("loglevel").cloned()
336 .unwrap_or_else(|| "info".to_string());
337 setup_logger(level.as_str());
338 let sources = pact_source(matches);
339 let watch_mode = matches.get_flag("watch");
340
341 let pacts = load_pacts(sources.clone(), matches.get_flag("insecure-tls"),
342 matches.get_one("ext")).await;
343 if pacts.iter().any(|p| p.is_err()) {
344 error!("There were errors loading the pact files.");
345 for error in pacts.iter()
346 .filter(|p| p.is_err())
347 .map(|e| match e {
348 Err(err) => err.clone(),
349 _ => panic!("Internal Code Error - was expecting an error but was not")
350 }) {
351 error!(" - {}", error);
352 }
353 Err(ExitCode::from(3))
354 } else {
355 let port = *matches.get_one::<u16>("port").unwrap_or(&0);
356 let provider_state = matches.get_one::<Regex>("provider-state").cloned();
357 let provider_state_header_name = matches.get_one::<String>("provider-state-header-name").cloned();
358 let empty_provider_states = matches.get_flag("empty-provider-state");
359 let pacts = pacts.iter()
360 .map(|result| {
361 let (p, s) = result.as_ref().unwrap();
363 (p.as_v4_pact().unwrap(), s.clone())
364 })
365 .collect::<Vec<_>>();
366 let interactions: usize = pacts.iter().map(|(p, _)| p.interactions.len()).sum();
367 info!("Loaded {} pacts ({} total interactions)", pacts.len(), interactions);
368 let auto_cors = matches.get_flag("cors");
369 let referer = matches.get_flag("cors-referer");
370
371 if watch_mode {
372 let shared_pacts = Arc::new(Mutex::new(pacts.clone()));
374 let (reload_tx, reload_rx) = broadcast::channel::<()>(1);
375
376 setup_file_watcher(sources, matches, shared_pacts.clone(), reload_tx.clone());
378
379 let server_handler = ServerHandler::new_with_watch(
380 shared_pacts,
381 reload_tx,
382 auto_cors,
383 referer,
384 provider_state,
385 provider_state_header_name,
386 empty_provider_states);
387 tokio::task::spawn_blocking(move || {
388 server_handler.start_server(port)
389 }).await.unwrap()
390 } else {
391 let server_handler = ServerHandler::new(
392 pacts,
393 auto_cors,
394 referer,
395 provider_state,
396 provider_state_header_name,
397 empty_provider_states);
398 tokio::task::spawn_blocking(move || {
399 server_handler.start_server(port)
400 }).await.unwrap()
401 }
402 }
403}
404
405pub fn build_args() -> Command {
408 command!()
409 .about(format!("Pact Stub Server {}", crate_version!()))
410 .arg_required_else_help(true)
411 .disable_version_flag(true)
412 .arg(Arg::new("loglevel")
413 .short('l')
414 .long("loglevel")
415 .default_value("info")
416 .value_parser(["error", "warn", "info", "debug", "trace", "none"])
417 .help("Log level (defaults to info)"))
418 .arg(Arg::new("file")
419 .short('f')
420 .long("file")
421 .required_unless_present_any(&["dir", "url", "broker-url"])
422 .action(ArgAction::Append)
423 .value_parser(clap::builder::NonEmptyStringValueParser::new())
424 .help("Pact file to load (can be repeated)"))
425 .arg(Arg::new("dir")
426 .short('d')
427 .long("dir")
428 .required_unless_present_any(&["file", "url", "broker-url"])
429 .action(ArgAction::Append)
430 .value_parser(clap::builder::NonEmptyStringValueParser::new())
431 .help("Directory of pact files to load (can be repeated)"))
432 .arg(Arg::new("ext")
433 .short('e')
434 .long("extension")
435 .value_parser(clap::builder::NonEmptyStringValueParser::new())
436 .requires("dir")
437 .help("File extension to use when loading from a directory (default is json)"))
438 .arg(Arg::new("url")
439 .short('u')
440 .long("url")
441 .required_unless_present_any(&["file", "dir", "broker-url"])
442 .action(ArgAction::Append)
443 .value_parser(clap::builder::NonEmptyStringValueParser::new())
444 .help("URL of pact file to fetch (can be repeated)"))
445 .arg(Arg::new("broker-url")
446 .short('b')
447 .long("broker-url")
448 .env("PACT_BROKER_BASE_URL")
449 .required_unless_present_any(&["file", "dir", "url"])
450 .value_parser(clap::builder::NonEmptyStringValueParser::new())
451 .help("URL of the pact broker to fetch pacts from"))
452 .arg(Arg::new("user")
453 .long("user")
454 .value_parser(clap::builder::NonEmptyStringValueParser::new())
455 .conflicts_with("token")
456 .help("User and password to use when fetching pacts from URLS or Pact Broker in user:password form"))
457 .arg(Arg::new("token")
458 .short('t')
459 .long("token")
460 .value_parser(clap::builder::NonEmptyStringValueParser::new())
461 .conflicts_with("user")
462 .help("Bearer token to use when fetching pacts from URLS or Pact Broker"))
463 .arg(Arg::new("port")
464 .short('p')
465 .long("port")
466 .use_value_delimiter(false)
467 .help("Port to run on (defaults to random port assigned by the OS)")
468 .value_parser(integer_value))
469 .arg(Arg::new("cors")
470 .short('o')
471 .long("cors")
472 .action(ArgAction::SetTrue)
473 .help("Automatically respond to OPTIONS requests and return default CORS headers"))
474 .arg(Arg::new("cors-referer")
475 .long("cors-referer")
476 .requires("cors")
477 .action(ArgAction::SetTrue)
478 .help("Set the CORS Access-Control-Allow-Origin header to the Referer"))
479 .arg(Arg::new("insecure-tls")
480 .long("insecure-tls")
481 .action(ArgAction::SetTrue)
482 .help("Disables TLS certificate validation"))
483 .arg(Arg::new("provider-state")
484 .short('s')
485 .long("provider-state")
486 .value_parser(regex_value)
487 .help("Provider state regular expression to filter the responses by"))
488 .arg(Arg::new("provider-state-header-name")
489 .long("provider-state-header-name")
490 .value_parser(clap::builder::NonEmptyStringValueParser::new())
491 .help("Name of the header parameter containing the provider state to be used in case \
492 multiple matching interactions are found"))
493 .arg(Arg::new("empty-provider-state")
494 .long("empty-provider-state")
495 .requires("provider-state")
496 .action(ArgAction::SetTrue)
497 .help("Include empty provider states when filtering with --provider-state"))
498 .arg(Arg::new("consumer-name")
499 .long("consumer-name")
500 .alias("consumer-names")
501 .requires("broker-url")
502 .action(ArgAction::Append)
503 .value_parser(regex_value)
504 .help("Consumer name or regex to use to filter the Pacts fetched from the Pact broker (can be repeated)"))
505 .arg(Arg::new("provider-name")
506 .long("provider-name")
507 .alias("provider-names")
508 .requires("broker-url")
509 .action(ArgAction::Append)
510 .value_parser(regex_value)
511 .help("Provider name or regex to use to filter the Pacts fetched from the Pact broker (can be repeated)"))
512 .arg(Arg::new("watch")
513 .short('w')
514 .long("watch")
515 .action(ArgAction::SetTrue)
516 .help("Watch for changes in pact files and reload automatically"))
517 .arg(Arg::new("version")
518 .short('v')
519 .long("version")
520 .action(ArgAction::Version)
521 .help("Print version information"))
522}
523
524fn setup_logger(level: &str) {
525 let log_level = match level {
526 "none" => LevelFilter::OFF,
527 _ => LevelFilter::from_str(level).unwrap_or(LevelFilter::INFO)
528 };
529 let subscriber = FmtSubscriber::builder()
530 .compact()
531 .with_max_level(log_level)
532 .with_thread_names(true)
533 .finish();
534 if let Err(err) = tracing::subscriber::set_global_default(subscriber) {
535 eprintln!("ERROR: Failed to initialise global tracing subscriber - {err}");
536 };
537}
538
539#[cfg(test)]
540mod test;