use super::{
Action, BodyData, ConnectionId, Direction, Error, Event, NetworkTraffic, Request, Response,
Version,
};
use aws_smithy_runtime_api::client::connector_metadata::ConnectorMetadata;
use aws_smithy_runtime_api::client::http::{
HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpConnector,
};
use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
use aws_smithy_runtime_api::shared::IntoShared;
use aws_smithy_types::body::SdkBody;
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::{fs, io};
use tokio::task::JoinHandle;
#[derive(Clone, Debug)]
pub struct RecordingClient {
pub(crate) data: Arc<Mutex<Vec<Event>>>,
pub(crate) num_events: Arc<AtomicUsize>,
pub(crate) inner: SharedHttpConnector,
}
#[cfg(feature = "legacy-rustls-ring")]
impl RecordingClient {
pub fn https() -> Self {
#[allow(deprecated)]
use crate::hyper_014::HyperConnector;
Self {
data: Default::default(),
num_events: Arc::new(AtomicUsize::new(0)),
#[allow(deprecated)]
inner: SharedHttpConnector::new(HyperConnector::builder().build_https()),
}
}
}
impl RecordingClient {
pub fn new(underlying_connector: impl HttpConnector + 'static) -> Self {
Self {
data: Default::default(),
num_events: Arc::new(AtomicUsize::new(0)),
inner: underlying_connector.into_shared(),
}
}
pub fn events(&self) -> MutexGuard<'_, Vec<Event>> {
self.data.lock().unwrap()
}
pub fn network_traffic(&self) -> NetworkTraffic {
NetworkTraffic {
events: self.events().clone(),
docs: Some("todo docs".into()),
version: Version::V0,
}
}
pub fn dump_to_file(&self, path: impl AsRef<Path>) -> Result<(), io::Error> {
fs::write(
path,
serde_json::to_string(&self.network_traffic()).unwrap(),
)
}
fn next_id(&self) -> ConnectionId {
ConnectionId(self.num_events.fetch_add(1, Ordering::Relaxed))
}
}
fn record_body(
body: &mut SdkBody,
event_id: ConnectionId,
direction: Direction,
event_bus: Arc<Mutex<Vec<Event>>>,
) -> JoinHandle<()> {
let (sender, output_body) = crate::test_util::body::channel_body();
let real_body = std::mem::replace(body, output_body);
tokio::spawn(async move {
let mut real_body = real_body;
let mut sender = sender;
loop {
let data = crate::test_util::body::next_data_frame(&mut real_body).await;
match data {
Some(Ok(data)) => {
event_bus.lock().unwrap().push(Event {
connection_id: event_id,
action: Action::Data {
data: BodyData::from(data.clone()),
direction,
},
});
if sender.send_data(data).await.is_err() {
event_bus.lock().unwrap().push(Event {
connection_id: event_id,
action: Action::Eof {
direction: direction.opposite(),
ok: false,
},
})
};
}
None => {
event_bus.lock().unwrap().push(Event {
connection_id: event_id,
action: Action::Eof {
ok: true,
direction,
},
});
drop(sender);
break;
}
Some(Err(_err)) => {
event_bus.lock().unwrap().push(Event {
connection_id: event_id,
action: Action::Eof {
ok: false,
direction,
},
});
sender.abort();
break;
}
}
}
})
}
impl HttpConnector for RecordingClient {
fn call(&self, mut request: HttpRequest) -> HttpConnectorFuture {
let event_id = self.next_id();
self.data.lock().unwrap().push(Event {
connection_id: event_id,
action: Action::Request {
request: Request::from(&request),
},
});
record_body(
request.body_mut(),
event_id,
Direction::Request,
self.data.clone(),
);
let events = self.data.clone();
let resp_fut = self.inner.call(request);
let fut = async move {
let resp = resp_fut.await;
match resp {
Ok(mut resp) => {
events.lock().unwrap().push(Event {
connection_id: event_id,
action: Action::Response {
response: Ok(Response::from(&resp)),
},
});
record_body(resp.body_mut(), event_id, Direction::Response, events);
Ok(resp)
}
Err(e) => {
events.lock().unwrap().push(Event {
connection_id: event_id,
action: Action::Response {
response: Err(Error(format!("{}", &e))),
},
});
Err(e)
}
}
};
HttpConnectorFuture::new(fut)
}
}
impl HttpClient for RecordingClient {
fn http_connector(
&self,
_: &HttpConnectorSettings,
_: &RuntimeComponents,
) -> SharedHttpConnector {
self.clone().into_shared()
}
fn connector_metadata(&self) -> Option<ConnectorMetadata> {
Some(ConnectorMetadata::new("recording-client", None))
}
}