use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use axum::body::Body;
use axum::extract::{Path, State};
use axum::http::{header, StatusCode};
use axum::middleware::{self, Next};
use axum::response::{IntoResponse, Response};
use axum::routing::{get, post};
use axum::Router;
use tokio::net::TcpListener;
use crate::fixture::Fixture;
use crate::format::IdGenerator;
#[cfg(feature = "oauth")]
#[derive(Clone)]
pub struct OAuthConfig {
pub client_id: String,
pub client_secret: String,
pub redirect_uris: Vec<String>,
pub scopes: Vec<String>,
}
#[cfg(feature = "oauth")]
impl Default for OAuthConfig {
fn default() -> Self {
Self {
client_id: "mock-client".to_string(),
client_secret: "mock-secret".to_string(),
redirect_uris: vec!["https://example.com/callback".to_string()],
scopes: vec![
"openid".to_string(),
"profile".to_string(),
"email".to_string(),
],
}
}
}
pub(crate) struct FixtureSet {
fixtures: Vec<Arc<Fixture>>,
primary_order: Vec<usize>,
catch_all_order: Vec<usize>,
}
impl FixtureSet {
pub(crate) fn new(fixtures: Vec<Arc<Fixture>>) -> Self {
let mut primary_order: Vec<usize> = fixtures
.iter()
.enumerate()
.filter(|(_, f)| !f.catch_all)
.map(|(i, _)| i)
.collect();
primary_order.sort_by_key(|&i| std::cmp::Reverse(fixtures[i].priority.unwrap_or(0)));
let mut catch_all_order: Vec<usize> = fixtures
.iter()
.enumerate()
.filter(|(_, f)| f.catch_all)
.map(|(i, _)| i)
.collect();
catch_all_order.sort_by_key(|&i| std::cmp::Reverse(fixtures[i].priority.unwrap_or(0)));
Self {
fixtures,
primary_order,
catch_all_order,
}
}
pub(crate) fn len(&self) -> usize {
self.fixtures.len()
}
pub(crate) fn find_match(&self, predicate: impl Fn(&Fixture) -> bool) -> Option<&Arc<Fixture>> {
self.primary_order
.iter()
.map(|&i| &self.fixtures[i])
.find(|f| predicate(f))
.or_else(|| {
self.catch_all_order
.iter()
.map(|&i| &self.fixtures[i])
.find(|f| predicate(f))
})
}
#[allow(dead_code)]
pub(crate) fn iter_all(&self) -> impl Iterator<Item = &Arc<Fixture>> {
self.fixtures.iter()
}
#[allow(dead_code)]
pub(crate) fn primary_iter(&self) -> impl Iterator<Item = &Arc<Fixture>> {
self.primary_order.iter().map(|&i| &self.fixtures[i])
}
#[allow(dead_code)]
pub(crate) fn catch_all_iter(&self) -> impl Iterator<Item = &Arc<Fixture>> {
self.catch_all_order.iter().map(|&i| &self.fixtures[i])
}
#[allow(dead_code)]
pub(crate) fn primary_iter_indexed(&self) -> impl Iterator<Item = (usize, &Arc<Fixture>)> {
self.primary_order.iter().map(|&i| (i, &self.fixtures[i]))
}
#[allow(dead_code)]
pub(crate) fn catch_all_iter_indexed(&self) -> impl Iterator<Item = (usize, &Arc<Fixture>)> {
self.catch_all_order.iter().map(|&i| (i, &self.fixtures[i]))
}
}
impl Default for FixtureSet {
fn default() -> Self {
Self::new(Vec::new())
}
}
pub(crate) struct AppState {
pub(crate) fixtures: std::sync::RwLock<FixtureSet>,
pub(crate) id_gen: IdGenerator,
pub(crate) verbose: bool,
pub(crate) request_counter: AtomicU64,
pub(crate) chaos_counter: AtomicU64,
#[allow(dead_code)]
pub(crate) capture_counter: AtomicU64,
pub(crate) moderation_counter: AtomicU64,
pub(crate) auth: Option<crate::auth::AuthState>,
pub(crate) scenarios: std::sync::RwLock<std::collections::HashMap<String, String>>,
pub(crate) captured_requests: std::sync::RwLock<std::collections::VecDeque<CapturedRequest>>,
pub(crate) capture_capacity: Option<usize>,
pub(crate) explicit_models: Option<Vec<String>>,
pub(crate) diagnostics: bool,
#[allow(dead_code)]
pub(crate) boot_instant: std::time::Instant,
#[allow(dead_code)]
pub(crate) boot_epoch_ms: u64,
#[cfg(feature = "ui")]
pub(crate) ui_tx: Option<tokio::sync::broadcast::Sender<crate::ui::UiEvent>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum RequestOutcome {
Matched,
NoFixtureMatch,
BadRequest,
AuthRejected,
CodeEndpoint,
ModerationEndpoint,
}
impl RequestOutcome {
pub fn label(&self) -> &'static str {
match self {
Self::Matched => "matched",
Self::NoFixtureMatch => "no_match",
Self::BadRequest => "bad_request",
Self::AuthRejected => "auth_rejected",
Self::CodeEndpoint => "code_endpoint",
Self::ModerationEndpoint => "moderation",
}
}
pub fn default_status(&self) -> u16 {
match self {
Self::Matched => 200,
Self::NoFixtureMatch => 404,
Self::BadRequest => 400,
Self::AuthRejected => 401,
Self::CodeEndpoint => 200,
Self::ModerationEndpoint => 200,
}
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct CapturedRequest {
pub method: String,
pub path: String,
pub body: String,
pub outcome: RequestOutcome,
pub matched_scenario: Option<String>,
pub capture_id: u64,
pub status_code: u16,
pub timestamp: std::time::Instant,
}
impl CapturedRequest {
pub fn was_matched(&self) -> bool {
self.outcome == RequestOutcome::Matched
}
}
impl AppState {
pub(crate) fn next_request_id(&self) -> String {
let n = self.request_counter.fetch_add(1, Ordering::Relaxed);
format!("req-llmposter-{}", n)
}
pub(crate) fn set_fixtures(
&self,
mut fixtures: Vec<Fixture>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
for (i, fixture) in fixtures.iter_mut().enumerate() {
fixture
.validate()
.map_err(|e| format!("Fixture #{}: {}", i + 1, e))?;
}
self.swap_fixtures_unchecked(fixtures);
Ok(())
}
pub(crate) fn swap_fixtures_unchecked(&self, fixtures: Vec<Fixture>) {
let arced: Vec<Arc<Fixture>> = fixtures.into_iter().map(Arc::new).collect();
let set = FixtureSet::new(arced);
let mut guard = self.fixtures.write().unwrap_or_else(|e| e.into_inner());
*guard = set;
}
}
#[cfg(any(feature = "watch", unix))]
#[derive(Debug, Clone, Copy)]
enum ReloadTrigger {
#[cfg(feature = "watch")]
Watch,
#[cfg(unix)]
Sighup,
}
#[cfg(any(feature = "watch", unix))]
impl ReloadTrigger {
fn label(self) -> &'static str {
match self {
#[cfg(feature = "watch")]
Self::Watch => "watch",
#[cfg(unix)]
Self::Sighup => "SIGHUP",
}
}
}
#[cfg(any(feature = "watch", unix))]
fn reload_and_swap(
state: &AppState,
sources: &[std::path::PathBuf],
verbose: bool,
trigger: ReloadTrigger,
) {
let label = trigger.label();
match crate::fixture::reload_sources(sources) {
Ok(fixtures) => {
state.swap_fixtures_unchecked(fixtures);
if verbose {
eprintln!("[llmposter] {} reload: fixtures swapped", label);
}
}
Err(e) => {
eprintln!(
"[llmposter] {} reload parse failed, keeping old fixtures: {}",
label, e
);
}
}
}
#[cfg(feature = "watch")]
fn spawn_file_watcher(
state: std::sync::Weak<AppState>,
sources: Vec<std::path::PathBuf>,
verbose: bool,
) -> Option<std::thread::JoinHandle<()>> {
use notify_debouncer_mini::{new_debouncer, notify::RecursiveMode};
use std::time::Duration;
let (tx, rx) = std::sync::mpsc::channel();
let mut debouncer = match new_debouncer(Duration::from_millis(250), tx) {
Ok(d) => d,
Err(e) => return log_watcher_setup_failure(e),
};
let mut watched_any = false;
for path in &sources {
let mode = if path.is_dir() {
RecursiveMode::Recursive
} else {
RecursiveMode::NonRecursive
};
match debouncer.watcher().watch(path, mode) {
Ok(()) => {
watched_any = true;
}
Err(e) => {
eprintln!(
"[llmposter] file watcher failed to watch {}: {}",
path.display(),
e
);
}
}
}
if !watched_any {
eprintln!(
"[llmposter] file watcher: no sources could be registered ({} tried), giving up",
sources.len()
);
return None;
}
Some(std::thread::spawn(move || {
let _debouncer = debouncer;
watcher_loop(&state, &sources, verbose, &rx);
}))
}
#[cfg(feature = "watch")]
fn log_watcher_setup_failure(
e: notify_debouncer_mini::notify::Error,
) -> Option<std::thread::JoinHandle<()>> {
eprintln!("[llmposter] file watcher setup failed: {}", e);
None
}
#[cfg(feature = "watch")]
fn watcher_loop(
state: &std::sync::Weak<AppState>,
sources: &[std::path::PathBuf],
verbose: bool,
rx: &std::sync::mpsc::Receiver<
Result<Vec<notify_debouncer_mini::DebouncedEvent>, notify_debouncer_mini::notify::Error>,
>,
) {
use std::sync::mpsc::RecvTimeoutError;
loop {
if state.upgrade().is_none() {
return;
}
match rx.recv_timeout(std::time::Duration::from_millis(500)) {
Ok(Ok(_events)) => {
let Some(arc) = state.upgrade() else {
return;
};
reload_and_swap(&arc, sources, verbose, ReloadTrigger::Watch);
}
Ok(Err(e)) => {
eprintln!("[llmposter] file watcher error: {}", e);
}
Err(RecvTimeoutError::Timeout) => {
}
Err(RecvTimeoutError::Disconnected) => {
return;
}
}
}
}
#[cfg(unix)]
fn log_sighup_setup_failure(e: std::io::Error) {
eprintln!("[llmposter] SIGHUP handler setup failed: {}", e);
}
#[cfg(unix)]
fn log_sighup_reload_panic(e: tokio::task::JoinError) {
eprintln!("[llmposter] SIGHUP reload worker panicked: {}", e);
}
#[cfg(unix)]
fn spawn_sighup_handler(
state: std::sync::Weak<AppState>,
sources: Vec<std::path::PathBuf>,
verbose: bool,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
use tokio::signal::unix::{signal, SignalKind};
let mut sig = match signal(SignalKind::hangup()) {
Ok(s) => s,
Err(e) => return log_sighup_setup_failure(e),
};
let mut shutdown_check = tokio::time::interval(std::time::Duration::from_millis(500));
shutdown_check.tick().await;
loop {
let Some(arc_tick) = state.upgrade() else {
return;
};
let got_signal = tokio::select! {
recv = sig.recv() => recv.is_some(),
_ = shutdown_check.tick() => false,
};
if got_signal {
let arc = arc_tick.clone();
let sources_clone = sources.clone();
let blocking = tokio::task::spawn_blocking(move || {
reload_and_swap(&arc, &sources_clone, verbose, ReloadTrigger::Sighup);
});
if let Err(e) = blocking.await {
log_sighup_reload_panic(e);
}
}
}
})
}
fn format_rfc3339_utc(epoch_secs: u64) -> String {
const SECS_PER_DAY: u64 = 86400;
const DAYS_PER_400Y: u64 = 146097;
const DAYS_PER_100Y: u64 = 36524;
const DAYS_PER_4Y: u64 = 1461;
const DAYS_PER_Y: u64 = 365;
let secs = epoch_secs % SECS_PER_DAY;
let hour = secs / 3600;
let min = (secs % 3600) / 60;
let sec = secs % 60;
let days = epoch_secs / SECS_PER_DAY + 719468; let era = days / DAYS_PER_400Y;
let doe = days - era * DAYS_PER_400Y;
let yoe = (doe - doe / (DAYS_PER_4Y - 1) + doe / DAYS_PER_100Y - doe / (DAYS_PER_400Y - 1))
/ DAYS_PER_Y;
let y = yoe + era * 400;
let doy = doe - (DAYS_PER_Y * yoe + yoe / 4 - yoe / 100);
let mut m = (5 * doy + 2) / 153;
let d = doy - (153 * m + 2) / 5 + 1;
m = if m < 10 { m + 3 } else { m - 9 };
let year = if m <= 2 { y + 1 } else { y };
format!(
"{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
year, m, d, hour, min, sec
)
}
async fn handle_health() -> axum::Json<serde_json::Value> {
axum::Json(serde_json::json!({ "status": "ok" }))
}
async fn handle_models(State(state): State<Arc<AppState>>) -> axum::Json<serde_json::Value> {
let model_ids: Vec<String> = if let Some(ref explicit) = state.explicit_models {
explicit.clone()
} else {
let fixtures = state.fixtures.read().unwrap_or_else(|e| e.into_inner());
let mut seen = std::collections::HashSet::new();
let mut ids = Vec::new();
for f in fixtures.iter_all() {
if let Some(ref m) = f.match_rule {
if let Some(crate::fixture::StringMatch::Substring(ref s)) = m.model {
if seen.insert(s.clone()) {
ids.push(s.clone());
}
}
}
}
ids
};
let created = crate::format::openai::unix_timestamp();
let data: Vec<serde_json::Value> = model_ids
.iter()
.map(|id| {
serde_json::json!({
"id": id,
"object": "model",
"created": created,
"owned_by": "llmposter"
})
})
.collect();
axum::Json(serde_json::json!({
"object": "list",
"data": data
}))
}
async fn handle_moderations(State(state): State<Arc<AppState>>, body: String) -> Response<Body> {
let json_body: serde_json::Value = match serde_json::from_str(&body) {
Ok(v) => v,
Err(_) => {
crate::handler::capture_non_matched(
&state,
"POST",
"/v1/moderations",
&body,
RequestOutcome::BadRequest,
);
return (
StatusCode::BAD_REQUEST,
[(header::CONTENT_TYPE, "application/json")],
crate::failure::build_error_body(400, "Invalid JSON"),
)
.into_response();
}
};
let has_valid_input = match json_body.get("input") {
Some(v) if v.is_string() => true,
Some(v) if v.is_array() => v
.as_array()
.map(|a| !a.is_empty() && a.iter().all(|x| x.is_string()))
.unwrap_or(false),
_ => false,
};
if !has_valid_input {
crate::handler::capture_non_matched(
&state,
"POST",
"/v1/moderations",
&body,
RequestOutcome::BadRequest,
);
return (
StatusCode::BAD_REQUEST,
[(header::CONTENT_TYPE, "application/json")],
crate::failure::build_error_body(
400,
"'input' must be a non-empty string or array of strings",
),
)
.into_response();
}
let model = json_body
.get("model")
.and_then(|v| v.as_str())
.unwrap_or("text-moderation-latest");
let id = format!(
"modr-llmposter-{}",
state.moderation_counter.fetch_add(1, Ordering::Relaxed)
);
crate::handler::push_captured(
&state,
"POST",
"/v1/moderations",
body,
RequestOutcome::ModerationEndpoint,
None,
200,
);
let resp = serde_json::json!({
"id": id,
"model": model,
"results": [{
"flagged": false,
"categories": {
"hate": false, "hate/threatening": false,
"harassment": false, "harassment/threatening": false,
"self-harm": false, "self-harm/intent": false,
"self-harm/instructions": false,
"sexual": false, "sexual/minors": false,
"violence": false, "violence/graphic": false
},
"category_scores": {
"hate": 0.0001, "hate/threatening": 0.0001,
"harassment": 0.0001, "harassment/threatening": 0.0001,
"self-harm": 0.0001, "self-harm/intent": 0.0001,
"self-harm/instructions": 0.0001,
"sexual": 0.0001, "sexual/minors": 0.0001,
"violence": 0.0001, "violence/graphic": 0.0001
}
}]
});
(
StatusCode::OK,
[(header::CONTENT_TYPE, "application/json")],
resp.to_string(),
)
.into_response()
}
async fn handle_status_code(
State(state): State<Arc<AppState>>,
Path(raw_code): Path<String>,
) -> Response<Body> {
let parsed = raw_code.parse::<u16>().ok();
let validated = parsed
.and_then(|c| StatusCode::from_u16(c).ok())
.filter(|s| s.as_u16() <= 599);
let (outcome, status_code) = if let Some(s) = validated {
(RequestOutcome::CodeEndpoint, s.as_u16())
} else {
(RequestOutcome::BadRequest, 400)
};
crate::handler::push_captured(
&state,
"GET",
&format!("/code/{}", raw_code),
String::new(),
outcome,
None,
status_code,
);
match validated {
Some(status) => {
if status.as_u16() < 200
|| status == StatusCode::NO_CONTENT
|| status == StatusCode::RESET_CONTENT
|| status == StatusCode::NOT_MODIFIED
{
return Response::builder()
.status(status)
.body(Body::empty())
.expect("static headers");
}
let description = status.canonical_reason().unwrap_or("Unknown");
let code = status.as_u16();
let body = serde_json::json!({"code": code, "description": description}).to_string();
let mut builder = Response::builder()
.status(status)
.header(header::CONTENT_TYPE, "application/json");
if status.is_redirection() {
builder = builder.header(header::LOCATION, "/");
}
builder.body(Body::from(body)).expect("static headers")
}
None => Response::builder()
.status(StatusCode::BAD_REQUEST)
.header(header::CONTENT_TYPE, "application/json")
.body(Body::from(
r#"{"code":400,"description":"Invalid status code — use 100-599"}"#,
))
.expect("static headers"),
}
}
async fn add_response_headers(
State(state): State<Arc<AppState>>,
request: axum::extract::Request,
next: Next,
) -> axum::response::Response {
let mut resp = next.run(request).await;
let request_id = state.next_request_id();
resp.headers_mut()
.insert("x-request-id", request_id.parse().unwrap());
if resp.status() == StatusCode::TOO_MANY_REQUESTS {
let provider = resp.extensions().get::<crate::format::Provider>().copied();
let headers = resp.headers_mut();
headers
.entry("retry-after")
.or_insert("60".parse().unwrap());
match provider {
Some(crate::format::Provider::Anthropic) => {
let reset_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
+ 60;
let reset_ts = format_rfc3339_utc(reset_secs);
headers
.entry("anthropic-ratelimit-requests-limit")
.or_insert("100".parse().unwrap());
headers
.entry("anthropic-ratelimit-requests-remaining")
.or_insert("0".parse().unwrap());
headers
.entry("anthropic-ratelimit-requests-reset")
.or_insert(reset_ts.parse().unwrap());
}
Some(crate::format::Provider::Gemini) => {
}
Some(crate::format::Provider::OpenAI) | Some(crate::format::Provider::Responses) => {
headers
.entry("x-ratelimit-limit-requests")
.or_insert("100".parse().unwrap());
headers
.entry("x-ratelimit-remaining-requests")
.or_insert("0".parse().unwrap());
headers
.entry("x-ratelimit-reset-requests")
.or_insert("1m0s".parse().unwrap());
}
None => {
}
}
}
resp
}
fn report_serve_result(result: std::io::Result<()>, err_tx: tokio::sync::oneshot::Sender<String>) {
if let Err(e) = result {
let msg = format!("[llmposter] server error: {}", e);
eprintln!("{}", msg);
let _ = err_tx.send(msg);
}
}
pub struct ServerBuilder {
fixtures: Vec<Fixture>,
fixture_sources: Vec<std::path::PathBuf>,
#[cfg(feature = "watch")]
watch_enabled: bool,
bind_addr: String,
verbose: bool,
auth_enabled: bool,
bearer_tokens: Vec<(String, Option<u64>)>,
#[cfg(feature = "oauth")]
oauth_config: Option<OAuthConfig>,
capture_capacity: Option<usize>,
models: Vec<String>,
models_override_set: bool,
diagnostics: bool,
#[cfg(feature = "ui")]
ui_enabled: bool,
}
impl ServerBuilder {
pub fn new() -> Self {
Self {
fixtures: Vec::new(),
fixture_sources: Vec::new(),
#[cfg(feature = "watch")]
watch_enabled: false,
bind_addr: "127.0.0.1:0".to_string(),
verbose: false,
auth_enabled: false,
bearer_tokens: Vec::new(),
#[cfg(feature = "oauth")]
oauth_config: None,
capture_capacity: None,
models: Vec::new(),
models_override_set: false,
diagnostics: false,
#[cfg(feature = "ui")]
ui_enabled: false,
}
}
pub fn capture_capacity(mut self, max: usize) -> Self {
self.capture_capacity = Some(max);
self
}
pub fn models(mut self, models: Vec<String>) -> Self {
self.models = models;
self.models_override_set = true;
self
}
pub fn diagnostics(mut self, enabled: bool) -> Self {
self.diagnostics = enabled;
self
}
pub fn fixture(mut self, f: Fixture) -> Self {
self.fixtures.push(f);
self
}
pub fn fixtures(mut self, fixtures: Vec<Fixture>) -> Self {
self.fixtures.extend(fixtures);
self
}
pub fn fixture_count(&self) -> usize {
self.fixtures.len()
}
pub fn bind(mut self, addr: &str) -> Self {
self.bind_addr = addr.to_string();
self
}
pub fn verbose(mut self, v: bool) -> Self {
self.verbose = v;
self
}
pub fn with_auth(mut self, enabled: bool) -> Self {
self.auth_enabled = enabled;
self
}
pub fn with_bearer_token(mut self, token: &str) -> Self {
self.auth_enabled = true;
self.bearer_tokens.push((token.to_string(), None));
self
}
pub fn with_bearer_token_uses(mut self, token: &str, max_uses: u64) -> Self {
self.auth_enabled = true;
self.bearer_tokens.push((token.to_string(), Some(max_uses)));
self
}
#[cfg(feature = "oauth")]
pub fn with_oauth(mut self, config: OAuthConfig) -> Self {
self.auth_enabled = true;
self.oauth_config = Some(config);
self
}
#[cfg(feature = "oauth")]
pub fn with_oauth_defaults(mut self) -> Self {
self.auth_enabled = true;
self.oauth_config = Some(OAuthConfig::default());
self
}
pub fn load_yaml(mut self, path: &std::path::Path) -> Result<Self, Box<dyn std::error::Error>> {
let fixtures = crate::fixture::load_yaml_file(path)?;
self.fixtures.extend(fixtures);
self.fixture_sources.push(path.to_path_buf());
Ok(self)
}
pub fn load_yaml_dir(
mut self,
dir: &std::path::Path,
) -> Result<Self, Box<dyn std::error::Error>> {
let fixtures = crate::fixture::load_yaml_dir(dir)?;
self.fixtures.extend(fixtures);
self.fixture_sources.push(dir.to_path_buf());
Ok(self)
}
#[cfg(feature = "watch")]
pub fn watch(mut self, enabled: bool) -> Self {
self.watch_enabled = enabled;
self
}
#[cfg(feature = "ui")]
pub fn ui(mut self, enabled: bool) -> Self {
self.ui_enabled = enabled;
self
}
pub async fn build(mut self) -> Result<MockServer, Box<dyn std::error::Error>> {
for (i, fixture) in self.fixtures.iter_mut().enumerate() {
fixture
.validate()
.map_err(|e| format!("Fixture #{}: {}", i + 1, e))?;
}
#[cfg(feature = "oauth")]
let oauth_server = if let Some(ref config) = self.oauth_config {
let redirect_uris: Vec<&str> =
config.redirect_uris.iter().map(String::as_str).collect();
let scopes: Vec<&str> = config.scopes.iter().map(String::as_str).collect();
let oauth = oauth_mock::MockServer::builder()
.with_client(
&config.client_id,
&config.client_secret,
redirect_uris,
scopes,
)
.spawn_on_free_port()
.await
.map_err(|e| format!("Failed to start OAuth server: {}", e))?;
Some(oauth)
} else {
None
};
let auth = if self.auth_enabled {
let auth_state = crate::auth::AuthState::new();
for (token, max_uses) in &self.bearer_tokens {
auth_state.add_token(token, *max_uses);
}
#[cfg(feature = "oauth")]
if let Some(ref oauth) = oauth_server {
if let Some(ref config) = self.oauth_config {
auth_state.set_oauth_introspect(crate::auth::OAuthIntrospect {
url: format!("{}/introspect", oauth.base_url()),
client_id: config.client_id.clone(),
client_secret: config.client_secret.clone(),
client: reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(5))
.build()
.map_err(|e| {
format!("Failed to build OAuth introspect client: {}", e)
})?,
});
}
}
Some(auth_state)
} else {
None
};
let explicit_models = if self.models_override_set {
Some(self.models)
} else {
None
};
let arced_fixtures: Vec<Arc<Fixture>> = self.fixtures.into_iter().map(Arc::new).collect();
let boot_epoch_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let state = Arc::new(AppState {
fixtures: std::sync::RwLock::new(FixtureSet::new(arced_fixtures)),
id_gen: IdGenerator::new(),
verbose: self.verbose,
request_counter: AtomicU64::new(1),
chaos_counter: AtomicU64::new(0),
capture_counter: AtomicU64::new(0),
moderation_counter: AtomicU64::new(1),
auth,
scenarios: std::sync::RwLock::new(std::collections::HashMap::new()),
captured_requests: std::sync::RwLock::new(std::collections::VecDeque::new()),
capture_capacity: self.capture_capacity,
explicit_models,
diagnostics: self.diagnostics,
boot_instant: std::time::Instant::now(),
boot_epoch_ms,
#[cfg(feature = "ui")]
ui_tx: if self.ui_enabled {
Some(tokio::sync::broadcast::channel(1024).0)
} else {
None
},
});
if !self.fixture_sources.is_empty() {
#[cfg(feature = "watch")]
if self.watch_enabled {
let _watcher = spawn_file_watcher(
Arc::downgrade(&state),
self.fixture_sources.clone(),
self.verbose,
);
}
#[cfg(unix)]
let _sighup = spawn_sighup_handler(
Arc::downgrade(&state),
self.fixture_sources.clone(),
self.verbose,
);
}
let server_state = state.clone(); #[allow(unused_mut)]
let mut app = Router::new()
.route("/v1/chat/completions", post(crate::handler::openai::handle))
.route("/v1/messages", post(crate::handler::anthropic::handle))
.route("/v1/responses", post(crate::handler::responses::handle))
.route(
"/v1beta/models/{*path}",
post(crate::handler::gemini::handle),
)
.route("/v1/completions", post(crate::handler::completions::handle))
.route("/v1/embeddings", post(crate::handler::embeddings::handle))
.route("/code/{status}", get(handle_status_code))
.route("/v1/models", get(handle_models))
.route("/v1/moderations", post(handle_moderations))
.route("/health", get(handle_health));
#[cfg(feature = "ui")]
if self.ui_enabled {
app = app.merge(crate::ui::ui_routes());
}
let app = app
.layer(axum::extract::DefaultBodyLimit::max(16 * 1024 * 1024)) .layer(middleware::from_fn_with_state(
state.clone(),
crate::auth::bearer_auth_check,
))
.layer(middleware::from_fn_with_state(
state.clone(),
add_response_headers,
))
.with_state(state);
let listener = TcpListener::bind(&self.bind_addr).await?;
let addr = listener.local_addr()?;
let (err_tx, err_rx) = tokio::sync::oneshot::channel::<String>();
let handle = tokio::spawn(async move {
report_serve_result(axum::serve(listener, app).await, err_tx);
});
Ok(MockServer {
addr,
_handle: handle,
server_error: tokio::sync::Mutex::new(err_rx),
state: server_state,
#[cfg(feature = "oauth")]
oauth_server,
})
}
}
impl Default for ServerBuilder {
fn default() -> Self {
Self::new()
}
}
pub struct MockServer {
addr: std::net::SocketAddr,
_handle: tokio::task::JoinHandle<()>,
server_error: tokio::sync::Mutex<tokio::sync::oneshot::Receiver<String>>,
state: Arc<AppState>,
#[cfg(feature = "oauth")]
oauth_server: Option<oauth_mock::MockServer>,
}
impl std::fmt::Debug for MockServer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MockServer")
.field("addr", &self.addr)
.finish()
}
}
impl MockServer {
pub fn url(&self) -> String {
format!("http://{}", self.addr)
}
pub fn port(&self) -> u16 {
self.addr.port()
}
#[cfg(feature = "oauth")]
pub fn oauth_url(&self) -> Option<String> {
self.oauth_server.as_ref().map(|s| s.base_url().to_string())
}
#[cfg(feature = "oauth")]
pub async fn oauth_client_credentials(&self) -> Option<(String, String)> {
match &self.oauth_server {
Some(s) => s.default_client().await,
None => None,
}
}
#[cfg(feature = "oauth")]
pub async fn approve_device_code(
&self,
user_code: &str,
) -> Result<(), Box<dyn std::error::Error>> {
match &self.oauth_server {
Some(s) => Ok(s.approve_device_code(user_code).await?),
None => Err("OAuth not configured".into()),
}
}
pub async fn check_error(&self) -> Result<(), String> {
let mut rx = self.server_error.lock().await;
match rx.try_recv() {
Ok(msg) => Err(msg),
Err(tokio::sync::oneshot::error::TryRecvError::Empty) => Ok(()),
Err(tokio::sync::oneshot::error::TryRecvError::Closed) => Ok(()),
}
}
pub fn get_requests(&self) -> Vec<CapturedRequest> {
self.state
.captured_requests
.read()
.unwrap_or_else(|e| e.into_inner())
.iter()
.cloned()
.collect()
}
pub fn request_count(&self) -> usize {
self.state
.captured_requests
.read()
.unwrap_or_else(|e| e.into_inner())
.len()
}
pub fn fixture_count(&self) -> usize {
self.state
.fixtures
.read()
.unwrap_or_else(|e| e.into_inner())
.len()
}
pub fn explicit_models(&self) -> Option<&[String]> {
self.state.explicit_models.as_deref()
}
pub fn matched_requests(&self) -> Vec<CapturedRequest> {
self.get_requests()
.into_iter()
.filter(|r| r.outcome == RequestOutcome::Matched)
.collect()
}
pub fn matched_count(&self) -> usize {
self.get_requests()
.iter()
.filter(|r| r.outcome == RequestOutcome::Matched)
.count()
}
pub fn assert_matched(&self, substring: &str) {
let reqs = self.matched_requests();
if !reqs.iter().any(|r| r.body.contains(substring)) {
panic!(
"assert_matched: no matched request body contains {:?} ({} matched request{} captured)",
substring,
reqs.len(),
if reqs.len() == 1 { "" } else { "s" }
);
}
}
pub fn assert_not_matched(&self, substring: &str) {
let reqs = self.matched_requests();
if let Some(r) = reqs.iter().find(|r| r.body.contains(substring)) {
panic!(
"assert_not_matched: matched request body contains {:?} (path: {})",
substring, r.path
);
}
}
pub fn scenario_state(&self, name: &str) -> Option<String> {
self.state
.scenarios
.read()
.unwrap_or_else(|e| e.into_inner())
.get(name)
.cloned()
}
pub fn set_fixtures(
&self,
fixtures: Vec<Fixture>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
self.state.set_fixtures(fixtures)
}
pub fn reset(&self) {
self.state
.scenarios
.write()
.unwrap_or_else(|e| e.into_inner())
.clear();
self.state
.captured_requests
.write()
.unwrap_or_else(|e| e.into_inner())
.clear();
}
}
impl Drop for MockServer {
fn drop(&mut self) {
self._handle.abort();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn fixture_set_empty() {
let set = FixtureSet::default();
assert_eq!(set.len(), 0);
assert!(set.find_match(|_| true).is_none());
assert_eq!(set.iter_all().count(), 0);
assert_eq!(set.primary_iter().count(), 0);
assert_eq!(set.catch_all_iter().count(), 0);
}
#[test]
fn fixture_set_sorts_by_priority() {
let low = Arc::new(Fixture::new().respond_with_content("low"));
let high = Arc::new(
Fixture::new()
.with_priority(10)
.respond_with_content("high"),
);
let set = FixtureSet::new(vec![low, high]);
assert_eq!(set.len(), 2);
let first = set.primary_iter().next().unwrap();
assert_eq!(first.priority, Some(10));
}
#[test]
fn fixture_set_separates_catch_all() {
let normal = Arc::new(Fixture::new().respond_with_content("normal"));
let catch = Arc::new(Fixture::new().as_catch_all().respond_with_content("catch"));
let set = FixtureSet::new(vec![normal, catch]);
assert_eq!(set.primary_iter().count(), 1);
assert_eq!(set.catch_all_iter().count(), 1);
assert_eq!(set.iter_all().count(), 2);
}
#[test]
fn fixture_set_find_match_prefers_primary_over_catch_all() {
let catch = Arc::new(Fixture::new().as_catch_all().respond_with_content("catch"));
let normal = Arc::new(Fixture::new().respond_with_content("normal"));
let set = FixtureSet::new(vec![catch, normal]);
let matched = set.find_match(|_| true).unwrap();
assert!(!matched.catch_all);
}
#[test]
fn fixture_set_indexed_iterators_carry_original_index() {
let catch = Arc::new(Fixture::new().as_catch_all().respond_with_content("catch"));
let normal_a = Arc::new(Fixture::new().respond_with_content("a"));
let normal_b = Arc::new(Fixture::new().respond_with_content("b"));
let set = FixtureSet::new(vec![catch, normal_a, normal_b]);
let primary: Vec<usize> = set.primary_iter_indexed().map(|(i, _)| i).collect();
assert_eq!(primary, vec![1, 2]);
let catchall: Vec<usize> = set.catch_all_iter_indexed().map(|(i, _)| i).collect();
assert_eq!(catchall, vec![0]);
}
#[test]
fn request_outcome_label_and_status() {
assert_eq!(RequestOutcome::Matched.label(), "matched");
assert_eq!(RequestOutcome::NoFixtureMatch.label(), "no_match");
assert_eq!(RequestOutcome::BadRequest.label(), "bad_request");
assert_eq!(RequestOutcome::AuthRejected.label(), "auth_rejected");
assert_eq!(RequestOutcome::CodeEndpoint.label(), "code_endpoint");
assert_eq!(RequestOutcome::ModerationEndpoint.label(), "moderation");
assert_eq!(RequestOutcome::Matched.default_status(), 200);
assert_eq!(RequestOutcome::NoFixtureMatch.default_status(), 404);
assert_eq!(RequestOutcome::BadRequest.default_status(), 400);
assert_eq!(RequestOutcome::AuthRejected.default_status(), 401);
assert_eq!(RequestOutcome::CodeEndpoint.default_status(), 200);
assert_eq!(RequestOutcome::ModerationEndpoint.default_status(), 200);
}
#[tokio::test]
async fn should_build_and_start_server() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("test"))
.build()
.await
.unwrap();
assert!(server.port() > 0);
assert!(server.url().starts_with("http://127.0.0.1:"));
}
#[tokio::test]
async fn should_return_404_for_unknown_routes() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("test"))
.build()
.await
.unwrap();
let resp = reqwest::get(format!("{}/unknown", server.url()))
.await
.unwrap();
assert_eq!(resp.status(), 404);
}
#[tokio::test]
async fn should_support_custom_bind_address() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("test"))
.bind("127.0.0.1:0")
.build()
.await
.unwrap();
assert!(server.port() > 0);
}
#[tokio::test]
async fn should_support_default_builder() {
let builder = ServerBuilder::default();
let server = builder
.fixture(Fixture::new().respond_with_content("default"))
.build()
.await
.unwrap();
assert!(server.port() > 0);
}
#[tokio::test]
async fn should_support_fixtures_vec() {
let fixtures = vec![
Fixture::new()
.match_user_message("a")
.respond_with_content("A"),
Fixture::new()
.match_user_message("b")
.respond_with_content("B"),
];
let server = ServerBuilder::new()
.fixtures(fixtures)
.build()
.await
.unwrap();
assert!(server.port() > 0);
}
#[tokio::test]
async fn should_support_verbose_mode() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("test"))
.verbose(true)
.build()
.await
.unwrap();
assert!(server.port() > 0);
}
#[tokio::test]
async fn should_load_yaml_file() {
let dir = std::env::temp_dir().join("llmposter_server_test_yaml");
std::fs::create_dir_all(&dir).unwrap();
let file = dir.join("test.yaml");
std::fs::write(
&file,
"fixtures:\n - match:\n user_message: test\n response:\n content: loaded",
)
.unwrap();
let server = ServerBuilder::new()
.load_yaml(&file)
.unwrap()
.build()
.await
.unwrap();
assert!(server.port() > 0);
std::fs::remove_dir_all(&dir).ok();
}
#[tokio::test]
async fn should_load_yaml_dir() {
let dir = std::env::temp_dir().join("llmposter_server_test_dir");
std::fs::create_dir_all(&dir).unwrap();
std::fs::write(
dir.join("a.yaml"),
"fixtures:\n - response:\n content: a",
)
.unwrap();
let server = ServerBuilder::new()
.load_yaml_dir(&dir)
.unwrap()
.build()
.await
.unwrap();
assert!(server.port() > 0);
std::fs::remove_dir_all(&dir).ok();
}
#[tokio::test]
async fn should_return_error_when_bind_address_invalid() {
let result = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("ok"))
.bind("not-a-valid-address")
.build()
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn should_return_error_when_load_yaml_file_missing() {
let missing = std::path::Path::new("/nonexistent/llmposter/does-not-exist.yaml");
let result = ServerBuilder::new().load_yaml(missing);
assert!(result.is_err());
}
#[tokio::test]
async fn should_return_error_when_load_yaml_dir_missing() {
let missing = std::path::Path::new("/nonexistent/llmposter/does-not-exist-dir");
let result = ServerBuilder::new().load_yaml_dir(missing);
assert!(result.is_err());
}
#[tokio::test]
async fn should_return_error_on_invalid_fixture() {
let result = ServerBuilder::new()
.fixture(Fixture::new()) .build()
.await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Fixture #1"));
}
#[test]
fn should_format_rfc3339_unix_epoch() {
assert_eq!(format_rfc3339_utc(0), "1970-01-01T00:00:00Z");
}
#[test]
fn should_format_rfc3339_one_day() {
assert_eq!(format_rfc3339_utc(86400), "1970-01-02T00:00:00Z");
}
#[test]
fn should_format_rfc3339_leap_year_feb29() {
assert_eq!(format_rfc3339_utc(951_782_400), "2000-02-29T00:00:00Z");
}
#[test]
fn should_format_rfc3339_valid_format() {
let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let ts = format_rfc3339_utc(now_secs + 60);
assert!(ts.ends_with('Z'));
assert!(ts.contains('T'));
assert_eq!(ts.len(), 20); }
#[tokio::test]
async fn should_report_healthy_when_no_error() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("ok"))
.build()
.await
.unwrap();
assert!(server.check_error().await.is_ok());
}
fn mock_server_with_err_rx(err_rx: tokio::sync::oneshot::Receiver<String>) -> MockServer {
let state = Arc::new(AppState {
fixtures: std::sync::RwLock::new(FixtureSet::default()),
id_gen: IdGenerator::new(),
verbose: false,
request_counter: AtomicU64::new(1),
chaos_counter: AtomicU64::new(0),
capture_counter: AtomicU64::new(0),
moderation_counter: AtomicU64::new(1),
auth: None,
scenarios: std::sync::RwLock::new(std::collections::HashMap::new()),
captured_requests: std::sync::RwLock::new(std::collections::VecDeque::new()),
capture_capacity: None,
explicit_models: None,
diagnostics: false,
boot_instant: std::time::Instant::now(),
boot_epoch_ms: 0,
#[cfg(feature = "ui")]
ui_tx: None,
});
let handle = tokio::spawn(std::future::ready(()));
MockServer {
addr: "127.0.0.1:0".parse().unwrap(),
_handle: handle,
server_error: tokio::sync::Mutex::new(err_rx),
state,
#[cfg(feature = "oauth")]
oauth_server: None,
}
}
#[tokio::test]
async fn should_report_error_when_server_task_sent_error() {
let (tx, rx) = tokio::sync::oneshot::channel::<String>();
tx.send("[llmposter] server error: boom".to_string())
.unwrap();
let server = mock_server_with_err_rx(rx);
let result = server.check_error().await;
assert!(
result.is_err(),
"expected Err after tx.send, got {:?}",
result
);
let msg = result.unwrap_err();
assert!(msg.contains("boom"), "unexpected error body: {}", msg);
}
#[tokio::test]
async fn should_report_healthy_when_error_channel_closed_without_send() {
let (tx, rx) = tokio::sync::oneshot::channel::<String>();
drop(tx);
let server = mock_server_with_err_rx(rx);
assert!(server.check_error().await.is_ok());
}
#[tokio::test]
async fn should_return_requested_status_code() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("ok"))
.build()
.await
.unwrap();
let resp = reqwest::get(format!("{}/code/200", server.url()))
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: serde_json::Value = resp.json().await.unwrap();
assert_eq!(body["code"], 200);
assert_eq!(body["description"], "OK");
}
#[tokio::test]
async fn should_return_404_status_from_code_route() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("ok"))
.build()
.await
.unwrap();
let resp = reqwest::get(format!("{}/code/404", server.url()))
.await
.unwrap();
assert_eq!(resp.status(), 404);
let body: serde_json::Value = resp.json().await.unwrap();
assert_eq!(body["code"], 404);
assert_eq!(body["description"], "Not Found");
}
#[tokio::test]
async fn should_return_500_status_from_code_route() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("ok"))
.build()
.await
.unwrap();
let resp = reqwest::get(format!("{}/code/500", server.url()))
.await
.unwrap();
assert_eq!(resp.status(), 500);
let body: serde_json::Value = resp.json().await.unwrap();
assert_eq!(body["code"], 500);
}
#[tokio::test]
async fn should_add_location_header_on_redirect() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("ok"))
.build()
.await
.unwrap();
let client = reqwest::Client::builder()
.redirect(reqwest::redirect::Policy::none())
.build()
.unwrap();
let resp = client
.get(format!("{}/code/301", server.url()))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 301);
assert_eq!(resp.headers().get("location").unwrap(), "/");
}
#[tokio::test]
async fn should_return_bad_request_for_invalid_status_code() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("ok"))
.build()
.await
.unwrap();
let resp = reqwest::get(format!("{}/code/999", server.url()))
.await
.unwrap();
assert_eq!(resp.status(), 400);
let body: serde_json::Value = resp.json().await.unwrap();
assert_eq!(body["code"], 400);
}
#[tokio::test]
async fn should_return_empty_body_for_204() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("ok"))
.build()
.await
.unwrap();
let resp = reqwest::get(format!("{}/code/204", server.url()))
.await
.unwrap();
assert_eq!(resp.status(), 204);
let body = resp.text().await.unwrap();
assert!(body.is_empty(), "204 should have empty body, got: {}", body);
}
#[tokio::test]
async fn should_return_empty_body_for_304() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("ok"))
.build()
.await
.unwrap();
let resp = reqwest::get(format!("{}/code/304", server.url()))
.await
.unwrap();
assert_eq!(resp.status(), 304);
let body = resp.text().await.unwrap();
assert!(body.is_empty(), "304 should have empty body, got: {}", body);
}
#[tokio::test]
async fn should_return_empty_body_for_205() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("ok"))
.build()
.await
.unwrap();
let resp = reqwest::get(format!("{}/code/205", server.url()))
.await
.unwrap();
assert_eq!(resp.status(), 205);
let body = resp.text().await.unwrap();
assert!(body.is_empty(), "205 should have empty body, got: {}", body);
}
#[tokio::test]
async fn should_return_empty_body_for_1xx_status() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("ok"))
.build()
.await
.unwrap();
let resp = reqwest::get(format!("{}/code/100", server.url()))
.await
.unwrap();
let _ = resp.text().await;
}
#[cfg(any(feature = "watch", unix))]
fn dead_weak_state() -> std::sync::Weak<AppState> {
let arc = Arc::new(AppState {
fixtures: std::sync::RwLock::new(FixtureSet::default()),
id_gen: IdGenerator::new(),
verbose: false,
request_counter: AtomicU64::new(1),
chaos_counter: AtomicU64::new(0),
capture_counter: AtomicU64::new(0),
moderation_counter: AtomicU64::new(1),
auth: None,
scenarios: std::sync::RwLock::new(std::collections::HashMap::new()),
captured_requests: std::sync::RwLock::new(std::collections::VecDeque::new()),
capture_capacity: None,
explicit_models: None,
diagnostics: false,
boot_instant: std::time::Instant::now(),
boot_epoch_ms: 0,
#[cfg(feature = "ui")]
ui_tx: None,
});
let weak = Arc::downgrade(&arc);
drop(arc);
weak
}
#[cfg(feature = "watch")]
#[tokio::test]
async fn should_exit_file_watcher_thread_when_state_is_dropped() {
let dir = std::env::temp_dir().join(format!(
"llmposter_watcher_dead_weak_{}",
std::process::id()
));
let _ = std::fs::remove_dir_all(&dir);
std::fs::create_dir_all(&dir).unwrap();
let file = dir.join("a.yaml");
std::fs::write(
&file,
"fixtures:\n - match:\n user_message: hi\n response:\n content: v1\n",
)
.unwrap();
let handle = spawn_file_watcher(dead_weak_state(), vec![dir.clone()], false)
.expect("watcher should spawn successfully");
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
std::fs::write(
&file,
"fixtures:\n - match:\n user_message: hi\n response:\n content: v2\n",
)
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(600)).await;
assert!(
handle.is_finished(),
"watcher thread should have exited via dead-weak upgrade path"
);
handle.join().expect("watcher thread should exit cleanly");
let _ = std::fs::remove_dir_all(&dir);
}
#[cfg(feature = "watch")]
#[test]
fn should_return_from_watcher_loop_immediately_when_state_is_dead() {
let (_tx, rx) = std::sync::mpsc::channel::<
Result<
Vec<notify_debouncer_mini::DebouncedEvent>,
notify_debouncer_mini::notify::Error,
>,
>();
let start = std::time::Instant::now();
watcher_loop(&dead_weak_state(), &[], false, &rx);
let elapsed = start.elapsed();
assert!(
elapsed < std::time::Duration::from_millis(100),
"dead-weak watcher_loop should return near-instantly, took {:?}",
elapsed
);
}
#[cfg(feature = "watch")]
#[test]
fn should_log_and_continue_on_watcher_notify_error() {
let state = Arc::new(AppState {
fixtures: std::sync::RwLock::new(FixtureSet::default()),
id_gen: IdGenerator::new(),
verbose: false,
request_counter: AtomicU64::new(1),
chaos_counter: AtomicU64::new(0),
capture_counter: AtomicU64::new(0),
moderation_counter: AtomicU64::new(1),
auth: None,
scenarios: std::sync::RwLock::new(std::collections::HashMap::new()),
captured_requests: std::sync::RwLock::new(std::collections::VecDeque::new()),
capture_capacity: None,
explicit_models: None,
diagnostics: false,
boot_instant: std::time::Instant::now(),
boot_epoch_ms: 0,
#[cfg(feature = "ui")]
ui_tx: None,
});
let weak = Arc::downgrade(&state);
let (tx, rx) = std::sync::mpsc::channel::<
Result<
Vec<notify_debouncer_mini::DebouncedEvent>,
notify_debouncer_mini::notify::Error,
>,
>();
let err = notify_debouncer_mini::notify::Error::generic("synthetic test error");
tx.send(Err(err)).expect("channel alive");
drop(tx);
let start = std::time::Instant::now();
watcher_loop(&weak, &[], false, &rx);
let elapsed = start.elapsed();
assert!(
elapsed < std::time::Duration::from_millis(200),
"watcher_loop should drain the error + disconnect near-instantly, took {:?}",
elapsed
);
drop(state);
}
#[cfg(feature = "watch")]
#[test]
fn should_log_and_return_none_on_watcher_setup_failure() {
let err = notify_debouncer_mini::notify::Error::generic("synthetic");
let result = log_watcher_setup_failure(err);
assert!(result.is_none());
}
#[cfg(unix)]
#[test]
fn should_log_sighup_setup_failure() {
let err = std::io::Error::other("synthetic");
log_sighup_setup_failure(err);
}
#[cfg(unix)]
#[tokio::test]
async fn should_log_sighup_reload_panic() {
let handle = tokio::task::spawn_blocking(|| panic!("synthetic panic for test"));
let err = handle
.await
.expect_err("spawn_blocking should propagate panic");
log_sighup_reload_panic(err);
}
#[cfg(feature = "watch")]
#[test]
fn should_loop_past_recv_timeout_when_state_is_alive() {
let state = Arc::new(AppState {
fixtures: std::sync::RwLock::new(FixtureSet::default()),
id_gen: IdGenerator::new(),
verbose: false,
request_counter: AtomicU64::new(1),
chaos_counter: AtomicU64::new(0),
capture_counter: AtomicU64::new(0),
moderation_counter: AtomicU64::new(1),
auth: None,
scenarios: std::sync::RwLock::new(std::collections::HashMap::new()),
captured_requests: std::sync::RwLock::new(std::collections::VecDeque::new()),
capture_capacity: None,
explicit_models: None,
diagnostics: false,
boot_instant: std::time::Instant::now(),
boot_epoch_ms: 0,
#[cfg(feature = "ui")]
ui_tx: None,
});
let weak = Arc::downgrade(&state);
let (tx, rx) = std::sync::mpsc::channel::<
Result<
Vec<notify_debouncer_mini::DebouncedEvent>,
notify_debouncer_mini::notify::Error,
>,
>();
let worker = std::thread::spawn(move || {
watcher_loop(&weak, &[], false, &rx);
});
std::thread::sleep(std::time::Duration::from_millis(650));
drop(state);
let deadline = std::time::Instant::now() + std::time::Duration::from_millis(1200);
while !worker.is_finished() && std::time::Instant::now() < deadline {
std::thread::sleep(std::time::Duration::from_millis(20));
}
assert!(
worker.is_finished(),
"watcher_loop should exit once state drops after hitting the Timeout arm"
);
worker.join().expect("watcher thread should exit cleanly");
drop(tx);
}
#[cfg(feature = "watch")]
#[test]
fn should_return_from_watcher_loop_when_sender_disconnects() {
let state = Arc::new(AppState {
fixtures: std::sync::RwLock::new(FixtureSet::default()),
id_gen: IdGenerator::new(),
verbose: false,
request_counter: AtomicU64::new(1),
chaos_counter: AtomicU64::new(0),
capture_counter: AtomicU64::new(0),
moderation_counter: AtomicU64::new(1),
auth: None,
scenarios: std::sync::RwLock::new(std::collections::HashMap::new()),
captured_requests: std::sync::RwLock::new(std::collections::VecDeque::new()),
capture_capacity: None,
explicit_models: None,
diagnostics: false,
boot_instant: std::time::Instant::now(),
boot_epoch_ms: 0,
#[cfg(feature = "ui")]
ui_tx: None,
});
let weak = Arc::downgrade(&state);
let (tx, rx) = std::sync::mpsc::channel::<
Result<
Vec<notify_debouncer_mini::DebouncedEvent>,
notify_debouncer_mini::notify::Error,
>,
>();
drop(tx);
let start = std::time::Instant::now();
watcher_loop(&weak, &[], false, &rx);
let elapsed = start.elapsed();
assert!(
elapsed < std::time::Duration::from_millis(100),
"disconnected watcher_loop should return near-instantly, took {:?}",
elapsed
);
drop(state);
}
#[tokio::test]
async fn should_recover_from_poisoned_locks_in_mock_server_accessors() {
let state = Arc::new(AppState {
fixtures: std::sync::RwLock::new(FixtureSet::default()),
id_gen: IdGenerator::new(),
verbose: false,
request_counter: AtomicU64::new(1),
chaos_counter: AtomicU64::new(0),
capture_counter: AtomicU64::new(0),
moderation_counter: AtomicU64::new(1),
auth: None,
scenarios: std::sync::RwLock::new(std::collections::HashMap::new()),
captured_requests: std::sync::RwLock::new(std::collections::VecDeque::new()),
capture_capacity: None,
explicit_models: None,
diagnostics: false,
boot_instant: std::time::Instant::now(),
boot_epoch_ms: 0,
#[cfg(feature = "ui")]
ui_tx: None,
});
let s = state.clone();
let _ = std::thread::spawn(move || {
let _g = s.fixtures.write().unwrap();
panic!("intentional poison for test");
})
.join();
let s = state.clone();
let _ = std::thread::spawn(move || {
let _g = s.scenarios.write().unwrap();
panic!("intentional poison for test");
})
.join();
let s = state.clone();
let _ = std::thread::spawn(move || {
let _g = s.captured_requests.write().unwrap();
panic!("intentional poison for test");
})
.join();
assert!(state.fixtures.is_poisoned());
assert!(state.scenarios.is_poisoned());
assert!(state.captured_requests.is_poisoned());
let (_tx, rx) = tokio::sync::oneshot::channel::<String>();
let server = MockServer {
addr: "127.0.0.1:0".parse().unwrap(),
_handle: tokio::spawn(std::future::ready(())),
server_error: tokio::sync::Mutex::new(rx),
state: state.clone(),
#[cfg(feature = "oauth")]
oauth_server: None,
};
assert_eq!(server.get_requests().len(), 0);
assert_eq!(server.request_count(), 0);
assert!(server.scenario_state("missing").is_none());
server.reset();
server
.set_fixtures(vec![Fixture::new().respond_with_content("after-poison")])
.expect("poisoned-lock swap should still succeed");
}
#[tokio::test]
async fn should_forward_serve_error_through_oneshot() {
let (tx, rx) = tokio::sync::oneshot::channel::<String>();
let err = std::io::Error::new(std::io::ErrorKind::ConnectionAborted, "listener gone");
report_serve_result(Err(err), tx);
let msg = rx.await.expect("sender should have forwarded the error");
assert!(
msg.contains("[llmposter] server error"),
"unexpected framing: {}",
msg
);
assert!(msg.contains("listener gone"), "missing cause: {}", msg);
}
#[tokio::test]
async fn should_not_send_when_serve_returns_ok() {
let (tx, mut rx) = tokio::sync::oneshot::channel::<String>();
report_serve_result(Ok(()), tx);
let err = rx.try_recv().unwrap_err();
assert_eq!(err, tokio::sync::oneshot::error::TryRecvError::Closed);
}
#[cfg(unix)]
#[tokio::test]
async fn should_exit_sighup_handler_when_state_is_dropped() {
let handle = spawn_sighup_handler(dead_weak_state(), vec![], false);
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
unsafe {
libc::kill(libc::getpid(), libc::SIGHUP);
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
assert!(
handle.is_finished(),
"SIGHUP handler should have exited via dead-weak upgrade path"
);
handle.await.expect("SIGHUP handler should exit cleanly");
}
#[tokio::test]
async fn should_auto_derive_models_from_fixtures() {
let server = ServerBuilder::new()
.fixture(
Fixture::new()
.match_model("gpt-4")
.respond_with_content("a"),
)
.fixture(
Fixture::new()
.match_model("claude-sonnet-4-6")
.respond_with_content("b"),
)
.fixture(
Fixture::new()
.match_model("gpt-4")
.respond_with_content("c"),
) .fixture(Fixture::new().respond_with_content("d")) .build()
.await
.unwrap();
let resp = reqwest::get(format!("{}/v1/models", server.url()))
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: serde_json::Value = resp.json().await.unwrap();
assert_eq!(body["object"], "list");
let data = body["data"].as_array().unwrap();
assert_eq!(data.len(), 2);
assert_eq!(data[0]["id"], "gpt-4");
assert_eq!(data[1]["id"], "claude-sonnet-4-6");
assert_eq!(data[0]["owned_by"], "llmposter");
}
#[tokio::test]
async fn should_skip_fixture_with_no_model_constraint_when_auto_deriving_models() {
let server = ServerBuilder::new()
.fixture(
Fixture::new()
.match_model("gpt-4")
.respond_with_content("a"),
)
.fixture(
Fixture::new()
.match_user_message("hello")
.respond_with_content("b"),
)
.build()
.await
.unwrap();
let resp = reqwest::get(format!("{}/v1/models", server.url()))
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: serde_json::Value = resp.json().await.unwrap();
let data = body["data"].as_array().unwrap();
assert_eq!(data.len(), 1);
assert_eq!(data[0]["id"], "gpt-4");
}
#[tokio::test]
async fn should_use_explicit_models_over_auto_derived() {
let server = ServerBuilder::new()
.fixture(
Fixture::new()
.match_model("gpt-4")
.respond_with_content("a"),
)
.models(vec!["custom-model".to_string(), "other-model".to_string()])
.build()
.await
.unwrap();
let resp = reqwest::get(format!("{}/v1/models", server.url()))
.await
.unwrap();
let body: serde_json::Value = resp.json().await.unwrap();
let data = body["data"].as_array().unwrap();
assert_eq!(data.len(), 2);
assert_eq!(data[0]["id"], "custom-model");
assert_eq!(data[1]["id"], "other-model");
}
#[tokio::test]
async fn should_serve_empty_models_list_when_explicit_vec_is_empty() {
let server = ServerBuilder::new()
.fixture(
Fixture::new()
.match_model("gpt-4")
.respond_with_content("a"),
)
.models(vec![])
.build()
.await
.unwrap();
let resp = reqwest::get(format!("{}/v1/models", server.url()))
.await
.unwrap();
let body: serde_json::Value = resp.json().await.unwrap();
assert!(body["data"].as_array().unwrap().is_empty());
}
#[tokio::test]
async fn should_expose_explicit_models_via_accessor() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("a"))
.models(vec!["model-a".to_string(), "model-b".to_string()])
.build()
.await
.unwrap();
let models = server.explicit_models().expect("models should be set");
assert_eq!(models, &["model-a", "model-b"]);
}
#[tokio::test]
async fn should_return_none_for_explicit_models_when_not_set() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("a"))
.build()
.await
.unwrap();
assert!(server.explicit_models().is_none());
}
#[tokio::test]
async fn should_return_health_ok() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("a"))
.build()
.await
.unwrap();
let resp = reqwest::get(format!("{}/health", server.url()))
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: serde_json::Value = resp.json().await.unwrap();
assert_eq!(body["status"], "ok");
}
#[tokio::test]
async fn should_return_moderation_response() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("a"))
.build()
.await
.unwrap();
let resp = reqwest::Client::new()
.post(format!("{}/v1/moderations", server.url()))
.json(&serde_json::json!({"input": "hello world"}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: serde_json::Value = resp.json().await.unwrap();
assert_eq!(body["results"][0]["flagged"], false);
assert!(body["id"].as_str().unwrap().starts_with("modr-llmposter-"));
assert!(body["results"][0]["categories"]["hate"] == false);
assert!(
body["results"][0]["category_scores"]["hate"]
.as_f64()
.unwrap()
< 0.01
);
}
#[tokio::test]
async fn should_reject_invalid_json_on_moderations() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("a"))
.build()
.await
.unwrap();
let resp = reqwest::Client::new()
.post(format!("{}/v1/moderations", server.url()))
.body("not json")
.send()
.await
.unwrap();
assert_eq!(resp.status(), 400);
}
#[tokio::test]
async fn should_reject_moderations_missing_input() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("a"))
.build()
.await
.unwrap();
let resp = reqwest::Client::new()
.post(format!("{}/v1/moderations", server.url()))
.json(&serde_json::json!({}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 400);
}
#[tokio::test]
async fn should_accept_moderations_array_input() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("a"))
.build()
.await
.unwrap();
let resp = reqwest::Client::new()
.post(format!("{}/v1/moderations", server.url()))
.json(&serde_json::json!({"input": ["first", "second"]}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
}
#[tokio::test]
async fn should_reject_moderations_empty_array_input() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("a"))
.build()
.await
.unwrap();
let resp = reqwest::Client::new()
.post(format!("{}/v1/moderations", server.url()))
.json(&serde_json::json!({"input": []}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 400);
}
#[tokio::test]
async fn should_return_embedding_with_fixture() {
let server = ServerBuilder::new()
.fixture(
Fixture::new()
.match_user_message("test input")
.respond_with_embedding(vec![0.1, 0.2, 0.3]),
)
.build()
.await
.unwrap();
let resp = reqwest::Client::new()
.post(format!("{}/v1/embeddings", server.url()))
.json(&serde_json::json!({"model": "text-embedding-ada-002", "input": "test input"}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: serde_json::Value = resp.json().await.unwrap();
assert_eq!(body["object"], "list");
let emb = body["data"][0]["embedding"].as_array().unwrap();
assert_eq!(emb.len(), 3);
assert!((emb[0].as_f64().unwrap() - 0.1).abs() < 1e-10);
}
#[tokio::test]
async fn should_return_fake_embedding_without_fixture() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("catch all"))
.build()
.await
.unwrap();
let resp = reqwest::Client::new()
.post(format!("{}/v1/embeddings", server.url()))
.json(&serde_json::json!({"model": "text-embedding-ada-002", "input": "hello"}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: serde_json::Value = resp.json().await.unwrap();
let emb = body["data"][0]["embedding"].as_array().unwrap();
assert_eq!(emb.len(), 1536);
let resp2 = reqwest::Client::new()
.post(format!("{}/v1/embeddings", server.url()))
.json(&serde_json::json!({"model": "text-embedding-ada-002", "input": "hello"}))
.send()
.await
.unwrap();
let body2: serde_json::Value = resp2.json().await.unwrap();
let emb2 = body2["data"][0]["embedding"].as_array().unwrap();
assert_eq!(emb, emb2);
}
#[tokio::test]
async fn should_include_nearest_match_when_diagnostics_enabled() {
let server = ServerBuilder::new()
.fixture(
Fixture::new()
.match_user_message("weather")
.match_model("gpt-4")
.respond_with_content("sunny"),
)
.diagnostics(true)
.build()
.await
.unwrap();
let resp = reqwest::Client::new()
.post(format!("{}/v1/chat/completions", server.url()))
.json(&serde_json::json!({
"model": "claude-3",
"messages": [{"role": "user", "content": "weather"}]
}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 404);
let body: serde_json::Value = resp.json().await.unwrap();
let nm = &body["error"]["nearest_match"];
assert_eq!(nm["fixture_index"], 0);
assert_eq!(nm["total_fields"], 2);
let fields = nm["fields"].as_array().unwrap();
let um = fields
.iter()
.find(|f| f["field"] == "user_message")
.unwrap();
assert_eq!(um["passed"], true);
let m = fields.iter().find(|f| f["field"] == "model").unwrap();
assert_eq!(m["passed"], false);
}
#[tokio::test]
async fn should_omit_nearest_match_when_diagnostics_disabled() {
let server = ServerBuilder::new()
.fixture(
Fixture::new()
.match_user_message("weather")
.respond_with_content("sunny"),
)
.build()
.await
.unwrap();
let resp = reqwest::Client::new()
.post(format!("{}/v1/chat/completions", server.url()))
.json(&serde_json::json!({
"model": "gpt-4",
"messages": [{"role": "user", "content": "other"}]
}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 404);
let body: serde_json::Value = resp.json().await.unwrap();
assert!(body["error"]["nearest_match"].is_null());
}
#[tokio::test]
async fn should_return_empty_models_when_no_fixtures_have_model() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("a"))
.build()
.await
.unwrap();
let resp = reqwest::get(format!("{}/v1/models", server.url()))
.await
.unwrap();
let body: serde_json::Value = resp.json().await.unwrap();
let data = body["data"].as_array().unwrap();
assert!(data.is_empty());
}
#[tokio::test]
async fn should_filter_matched_requests() {
let server = ServerBuilder::new()
.fixture(
Fixture::new()
.match_user_message("hello")
.respond_with_content("hi"),
)
.build()
.await
.unwrap();
let url = format!("{}/v1/chat/completions", server.url());
let client = reqwest::Client::new();
client
.post(&url)
.json(&serde_json::json!({"model":"m","messages":[{"role":"user","content":"hello"}]}))
.send()
.await
.unwrap();
client
.post(&url)
.json(&serde_json::json!({"model":"m","messages":[{"role":"user","content":"other"}]}))
.send()
.await
.unwrap();
assert_eq!(server.request_count(), 2);
assert_eq!(server.matched_count(), 1);
assert_eq!(server.matched_requests().len(), 1);
server.assert_matched("hello");
server.assert_not_matched("other");
}
#[tokio::test]
#[should_panic(expected = "assert_matched")]
async fn should_panic_on_assert_matched_miss() {
let server = ServerBuilder::new()
.fixture(Fixture::new().respond_with_content("hi"))
.build()
.await
.unwrap();
server.assert_matched("never-sent");
}
#[tokio::test]
#[should_panic(expected = "assert_not_matched")]
async fn should_panic_on_assert_not_matched_hit() {
let server = ServerBuilder::new()
.fixture(
Fixture::new()
.match_user_message("hello")
.respond_with_content("hi"),
)
.build()
.await
.unwrap();
let url = format!("{}/v1/chat/completions", server.url());
reqwest::Client::new()
.post(&url)
.json(&serde_json::json!({"model":"m","messages":[{"role":"user","content":"hello"}]}))
.send()
.await
.unwrap();
server.assert_not_matched("hello");
}
#[cfg(unix)]
#[tokio::test]
async fn should_exit_sighup_handler_within_1s_of_state_drop() {
let handle = spawn_sighup_handler(dead_weak_state(), vec![], false);
let start = std::time::Instant::now();
while !handle.is_finished() && start.elapsed() < std::time::Duration::from_millis(1200) {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
assert!(
handle.is_finished(),
"SIGHUP handler should exit within 1.2s of a dead Weak<AppState>, no signal needed"
);
handle.await.expect("SIGHUP handler should exit cleanly");
}
}