use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use axum::{
Router,
extract::{Query, State},
http::{HeaderMap, StatusCode, header},
response::IntoResponse,
routing::{get, post},
};
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use crate::mock::dispatch::dispatch;
use crate::mock::fault_injection::{FaultInjector, PendingFault};
use crate::mock::state::{ChangeHook, DeviceState, MockState};
use crate::mock::{auth, helpers, snapshot};
const SOAP_CT: &str = "application/soap+xml; charset=utf-8";
struct Ctx {
base: String,
state: MockState,
faults: FaultInjector,
enforce_auth: bool,
}
#[derive(Default)]
pub struct MockServerBuilder {
port: u16,
initial_state: Option<DeviceState>,
on_change: Option<ChangeHook>,
enforce_auth: bool,
}
impl MockServerBuilder {
pub fn port(mut self, port: u16) -> Self {
self.port = port;
self
}
pub fn initial_state(mut self, state: DeviceState) -> Self {
self.initial_state = Some(state);
self
}
pub fn on_change(mut self, hook: ChangeHook) -> Self {
self.on_change = Some(hook);
self
}
pub fn enforce_auth(mut self, yes: bool) -> Self {
self.enforce_auth = yes;
self
}
pub async fn start(self) -> std::io::Result<MockServer> {
let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], self.port))).await?;
let local = listener.local_addr()?;
let base = format!("http://{local}");
let mut state = match self.initial_state {
Some(s) => MockState::with_state(s),
None => MockState::new(),
};
if let Some(hook) = self.on_change {
state.set_on_change(hook);
}
let ctx = Arc::new(Ctx {
base: base.clone(),
state,
faults: FaultInjector::new(),
enforce_auth: self.enforce_auth,
});
let app = Router::new()
.route("/mock/snapshot.jpg", get(handle_snapshot))
.route("/admin/inject_fault", post(handle_inject_fault))
.route("/admin/clear_faults", post(handle_clear_faults))
.route("/{*path}", post(handle_soap))
.with_state(ctx.clone());
let (tx, rx) = oneshot::channel::<()>();
tokio::spawn(async move {
let _ = axum::serve(listener, app)
.with_graceful_shutdown(async move {
let _ = rx.await;
})
.await;
});
Ok(MockServer {
device_url: format!("{base}/onvif/device"),
base,
port: local.port(),
ctx,
shutdown: Some(tx),
})
}
}
pub struct MockServer {
device_url: String,
base: String,
port: u16,
ctx: Arc<Ctx>,
shutdown: Option<oneshot::Sender<()>>,
}
impl MockServer {
pub async fn start() -> std::io::Result<Self> {
MockServerBuilder::default().start().await
}
pub fn builder() -> MockServerBuilder {
MockServerBuilder::default()
}
pub fn device_url(&self) -> &str {
&self.device_url
}
pub fn base_url(&self) -> &str {
&self.base
}
pub fn port(&self) -> u16 {
self.port
}
pub fn device(&self) -> &MockState {
&self.ctx.state
}
pub fn inject_fault(
&self,
action_suffix: impl Into<String>,
code: impl Into<String>,
reason: impl Into<String>,
) {
self.ctx.faults.inject(PendingFault {
action_suffix: action_suffix.into(),
code: code.into(),
reason: reason.into(),
});
}
pub fn clear_faults(&self) {
self.ctx.faults.clear_all();
}
}
impl Drop for MockServer {
fn drop(&mut self) {
if let Some(tx) = self.shutdown.take() {
let _ = tx.send(());
}
}
}
async fn handle_soap(
State(ctx): State<Arc<Ctx>>,
headers: HeaderMap,
body: axum::body::Bytes,
) -> impl IntoResponse {
let action = helpers::extract_action(&headers).unwrap_or_default();
let body_str = String::from_utf8_lossy(&body);
if let Some(f) = ctx.faults.take_for_action(&action) {
return (
StatusCode::OK,
[(header::CONTENT_TYPE, SOAP_CT)],
helpers::resp_soap_fault(&f.code, &f.reason),
);
}
if ctx.enforce_auth && auth::requires_auth(&action) {
if let Err(reason) = auth::validate_ws_security(&body_str, &ctx.state) {
return (
StatusCode::OK,
[(header::CONTENT_TYPE, SOAP_CT)],
auth::auth_fault(&reason),
);
}
}
let xml = dispatch(&action, &ctx.base, &ctx.state, &body_str);
(StatusCode::OK, [(header::CONTENT_TYPE, SOAP_CT)], xml)
}
async fn handle_snapshot(State(ctx): State<Arc<Ctx>>) -> impl IntoResponse {
let bmp = snapshot::generate_test_bmp(&ctx.state);
(
StatusCode::OK,
[
(header::CONTENT_TYPE, "image/bmp"),
(header::CACHE_CONTROL, "no-cache, no-store"),
],
bmp,
)
}
async fn handle_inject_fault(
State(ctx): State<Arc<Ctx>>,
Query(params): Query<HashMap<String, String>>,
) -> impl IntoResponse {
let action_suffix = params.get("action").cloned().unwrap_or_default();
if action_suffix.is_empty() {
return (
StatusCode::BAD_REQUEST,
"missing required 'action' query parameter\n".to_string(),
);
}
let code = params
.get("code")
.cloned()
.unwrap_or_else(|| "s:Receiver".to_string());
let reason = params
.get("reason")
.cloned()
.unwrap_or_else(|| "Injected fault".to_string());
ctx.faults.inject(PendingFault {
action_suffix,
code,
reason,
});
(StatusCode::OK, "fault injected\n".to_string())
}
async fn handle_clear_faults(State(ctx): State<Arc<Ctx>>) -> impl IntoResponse {
ctx.faults.clear_all();
(StatusCode::OK, "faults cleared\n".to_string())
}
#[cfg(test)]
mod tests {
use crate::OnvifClient;
use crate::mock::MockServer;
#[tokio::test]
async fn bound_server_roundtrips_via_real_http() {
let server = MockServer::start().await.unwrap();
let client = OnvifClient::new(server.device_url());
let info = client.get_device_info().await.unwrap();
assert_eq!(info.manufacturer, "oxvif-mock");
}
#[tokio::test]
async fn bound_server_set_then_get_roundtrips() {
let server = MockServer::start().await.unwrap();
let client = OnvifClient::new(server.device_url());
client.set_hostname("bound-cam").await.unwrap();
let h = client.get_hostname().await.unwrap();
assert_eq!(h.name.as_deref(), Some("bound-cam"));
assert_eq!(server.device().read().hostname, "bound-cam");
}
#[tokio::test]
async fn bound_server_start_firmware_upgrade_returns_upload_uri() {
let server = MockServer::start().await.unwrap();
let client = OnvifClient::new(server.device_url());
let start = client.start_firmware_upgrade().await.unwrap();
assert!(start.upload_uri.ends_with("/upload/firmware"));
assert_eq!(start.expected_down_time, "PT30S");
}
#[tokio::test]
async fn bound_server_start_system_restore_returns_upload_uri() {
let server = MockServer::start().await.unwrap();
let client = OnvifClient::new(server.device_url());
let start = client.start_system_restore().await.unwrap();
assert!(start.upload_uri.ends_with("/upload/restore"));
}
#[tokio::test]
async fn bound_server_system_uris_includes_backup() {
let server = MockServer::start().await.unwrap();
let client = OnvifClient::new(server.device_url());
let uris = client.get_system_uris().await.unwrap();
assert!(uris.system_backup_uri.is_some());
}
}