use crate::*;
use futures::future::Shared;
use futures::task::Context;
use futures::FutureExt;
use lazy_static::lazy_static;
use pin_project::pin_project;
use std::convert::TryFrom;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::Poll;
mod recording;
use recording::*;
mod eavesdrop;
use eavesdrop::*;
pub struct TestDevice {
pub device_info: DeviceInfo,
pub device: Device<TestDeviceTransport>,
device_guard: Option<tokio::sync::OwnedMutexGuard<Option<Device<TestDeviceTransport>>>>,
writer: Option<Writer>,
}
pub fn test_with_devices<FN, F>(f: FN)
where
FN: Fn(TestDevice) -> F,
F: Future<Output = Result<(), Error<TestDeviceTransportError>>> + Send + 'static,
{
tokio::runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.unwrap()
.block_on(async {
let mut devices = fixture_test_devices();
if let Some(factory) = LIVE_TEST_DEVICE_INSTANCE_FACTORY.clone().await {
devices.push(factory.get().await);
}
let mut failures = 0usize;
let mut successes = 0usize;
for (device_info, join_handle) in devices.into_iter().map(|test_device| {
(
test_device.device_info.clone(),
tokio::spawn(f(test_device)),
)
}) {
match join_handle.await {
Ok(Ok(())) => {
successes += 1;
}
Ok(Err(Error::UnsupportedFeature)) => {
}
Ok(Err(Error::TransportError(
TestDeviceTransportError::RequestNotRecorded,
))) => {
eprintln!(
"serial {:?} version {} is missing a recording",
device_info.serial_number, device_info.firmware_version,
);
}
Ok(Err(e)) => {
eprintln!(
"error for serial {:?} version {}: {}",
device_info.serial_number, device_info.firmware_version, e
);
failures += 1;
}
Err(e) => {
eprintln!(
"failure for serial {:?} version {}: {}",
device_info.serial_number, device_info.firmware_version, e
);
failures += 1;
}
}
}
assert_ne!(successes, 0, "test must succeed for at least one device");
assert_eq!(
failures, 0,
"no test device may panic or return an unexpected error"
);
});
}
lazy_static! {
static ref FIXTURE_RECORDINGS: Vec<Arc<Recording>> = load_fixture_recordings();
static ref LIVE_TEST_DEVICE_INSTANCE_FACTORY: Shared<Pin<Box<dyn Future<Output = Option<LiveTestDeviceInstanceFactory>> + Send>>> =
LiveTestDeviceInstanceFactory::new().boxed().shared();
}
fn fixture_test_devices() -> Vec<TestDevice> {
let uri = http::Uri::from_static("http://test.device.local");
FIXTURE_RECORDINGS
.iter()
.map(|recording| {
let recording = recording.clone();
let device_info = recording.device_info.clone();
let transport = TestDeviceTransport(TestDeviceTransportInner::Recording(recording));
TestDevice {
device_info,
device: crate::Device::new(transport, uri.clone()),
device_guard: None,
writer: None,
}
})
.collect()
}
fn load_fixture_recordings() -> Vec<Arc<Recording>> {
std::fs::read_dir(fixture_dir())
.expect("read fixture dir")
.map(|entry| entry.expect("read fixture dir entry"))
.map(|entry| entry.path())
.filter(|path| path.extension().map(|s| s == "json") == Some(true))
.map(|path| {
std::thread::spawn(move || {
let bytes = std::fs::read(&path).expect("read fixture file");
serde_json::from_slice(&bytes)
.map(Arc::new)
.map_err(|e| format!("parse fixture {}: {}", path.to_string_lossy(), e))
})
})
.map(|thread| thread.join().unwrap().unwrap())
.collect()
}
#[derive(Clone)]
struct LiveTestDeviceInstanceFactory {
device_info: DeviceInfo,
device: Arc<tokio::sync::Mutex<Option<Device<TestDeviceTransport>>>>,
writer: Writer,
}
impl LiveTestDeviceInstanceFactory {
pub async fn new() -> Option<Self> {
let uri = match std::env::var("RECORD_DEVICE_URI") {
Err(std::env::VarError::NotPresent) => return None,
other => other,
}
.expect("RECORD_DEVICE_URI must be valid UTF-8");
let uri = http::Uri::try_from(uri).expect("RECORD_DEVICE_URI must be a valid URI");
let (device_info, device, writer) = new_eavesdrop_device(uri)
.await
.expect("RECORD_DEVICE_URI must be an Axis device");
let device = device
.replace_transport(|e| TestDeviceTransport(TestDeviceTransportInner::Eavesdrop(e)));
Some(Self {
device_info,
device: Arc::new(tokio::sync::Mutex::new(Some(device))),
writer,
})
}
pub async fn get(&self) -> TestDevice {
let mut device_guard = self.device.clone().lock_owned().await;
TestDevice {
device_info: self.device_info.clone(),
device: device_guard.take().expect("must contain device"),
device_guard: Some(device_guard),
writer: Some(self.writer.clone()),
}
}
}
impl Drop for TestDevice {
fn drop(&mut self) {
if let Some(guard) = self.device_guard.as_mut() {
let mut device = Device::new(
TestDeviceTransport::default(),
http::Uri::from_static("http://1.2.3.4"),
);
std::mem::swap(&mut device, &mut self.device);
guard.replace(device);
}
if let Some(writer) = self.writer.as_mut() {
writer.write_as_fixture();
}
}
}
pub struct TestDeviceTransport(TestDeviceTransportInner);
impl Default for TestDeviceTransport {
fn default() -> Self {
Self(TestDeviceTransportInner::Recording(Arc::new(
Recording::default(),
)))
}
}
enum TestDeviceTransportInner {
Recording(Arc<Recording>),
Eavesdrop(EavesdropTransport<HyperTransport>),
}
impl Transport for TestDeviceTransport {
type Error = TestDeviceTransportError;
type Output = TestDeviceTransportOutput;
type Body = TestDeviceTransportBody;
type Chunk = Vec<u8>;
fn roundtrip(&self, request: http::Request<Vec<u8>>) -> Self::Output {
match &self.0 {
TestDeviceTransportInner::Recording(r) => {
TestDeviceTransportOutput(TestDeviceTransportOutputInner::Recording(
r.find(&request).map(|resp| resp.clone()),
))
}
TestDeviceTransportInner::Eavesdrop(t) => {
if let Some(recorded) = t.get(&request) {
TestDeviceTransportOutput(TestDeviceTransportOutputInner::Recording(Some(
recorded,
)))
} else {
TestDeviceTransportOutput(TestDeviceTransportOutputInner::Eavesdrop(Box::new(
t.roundtrip(request),
)))
}
}
}
}
}
#[derive(Debug)]
pub enum TestDeviceTransportError {
RequestNotRecorded,
LiveError(::hyper::Error),
}
impl std::error::Error for TestDeviceTransportError {}
impl fmt::Display for TestDeviceTransportError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
TestDeviceTransportError::RequestNotRecorded => write!(f, "request not recorded"),
TestDeviceTransportError::LiveError(e) => write!(f, "live error: {}", e),
}
}
}
#[pin_project]
pub struct TestDeviceTransportOutput(#[pin] TestDeviceTransportOutputInner);
#[pin_project]
enum TestDeviceTransportOutputInner {
Recording(#[pin] Option<RecordedHttpResponse>),
Eavesdrop(#[pin] Box<EavesdropTransportOutput<HyperTransport>>),
}
impl Future for TestDeviceTransportOutput {
type Output = Result<http::Response<TestDeviceTransportBody>, TestDeviceTransportError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project().0.project() {
__TestDeviceTransportOutputInnerProjection::Recording(mut o) => match o.take() {
Some(resp) => {
let resp = resp
.http_response_builder()
.body(TestDeviceTransportBody(
TestDeviceTransportBodyInner::Recording(Some(
resp.body.as_slice().to_vec(),
)),
))
.unwrap();
Poll::Ready(Ok(resp))
}
None => Poll::Ready(Err(TestDeviceTransportError::RequestNotRecorded)),
},
__TestDeviceTransportOutputInnerProjection::Eavesdrop(o) => {
o.poll(cx).map(|r| match r {
Ok(h) => {
let (parts, body) = h.into_parts();
let body = TestDeviceTransportBody(
TestDeviceTransportBodyInner::Eavesdrop(Box::new(body)),
);
Ok(http::Response::from_parts(parts, body))
}
Err(e) => Err(TestDeviceTransportError::LiveError(e)),
})
}
}
}
}
#[pin_project]
pub struct TestDeviceTransportBody(#[pin] TestDeviceTransportBodyInner);
#[pin_project]
enum TestDeviceTransportBodyInner {
Recording(#[pin] Option<Vec<u8>>),
Eavesdrop(#[pin] Box<EavesdropTransportBody<HyperTransport>>),
}
impl futures::Stream for TestDeviceTransportBody {
type Item = Result<Vec<u8>, TestDeviceTransportError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.project().0.project() {
__TestDeviceTransportBodyInnerProjection::Recording(mut b) => match b.take() {
Some(bytes) => Poll::Ready(Some(Ok(bytes))),
None => Poll::Ready(None),
},
__TestDeviceTransportBodyInnerProjection::Eavesdrop(b) => match b.poll_next(cx) {
Poll::Ready(Some(Ok(c))) => Poll::Ready(Some(Ok(c.as_ref().to_vec()))),
Poll::Ready(Some(Err(e))) => {
Poll::Ready(Some(Err(TestDeviceTransportError::LiveError(e))))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
},
}
}
}