use super::authenticator::AuthenticatorType;
use super::counter::curl::CounterType;
use super::paginator::curl::PaginatorType;
use super::Connector;
use crate::document::Document;
use crate::helper::mustache::Mustache;
use crate::helper::string::{DisplayOnlyForDebugging, Obfuscate};
use crate::{DataResult, DataSet, DataStream, Metadata};
use anyhow::Context as AnyContext;
use async_lock::Mutex;
use async_stream::stream;
use async_trait::async_trait;
use bytes::Bytes;
use dashmap::DashMap;
use futures::AsyncRead as AsyncReadIo;
use futures::AsyncWrite as AsyncWriteIo;
use futures::{AsyncWriteExt, Stream};
use futures_rustls::TlsConnector;
use futures_rustls::TlsStream as RustlsTlsStream;
use http::uri::{Authority, Scheme};
use http::HeaderMap;
use http::{
header, request::Builder, HeaderName, HeaderValue, Method, Request, Response, StatusCode,
Version,
};
use http_body_util::{BodyExt, Empty, Full};
use http_cache_semantics::{BeforeRequest, CachePolicy};
use hyper::body::Body;
use hyper::client::conn::http1::{Connection as ConnectionHttp1, SendRequest as SendRequestHttp1};
use hyper::client::conn::http2::SendRequest as SendRequestHttp2;
use json_value_merge::Merge;
use json_value_search::Search;
use rand::Rng;
use rustls::pki_types::pem::PemObject;
use rustls::pki_types::CertificateDer;
use rustls::{ClientConfig, RootCertStore};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use smol::io;
use smol::net::TcpStream;
use smol::{Executor, Timer};
use smol_hyper::rt::{FuturesIo, SmolExecutor};
use smol_timeout::TimeoutExt;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::{Arc, OnceLock};
use std::task::{Context, Poll};
use std::time::{Duration, SystemTime};
use std::{
fmt,
io::{Error, ErrorKind, Result},
};
use webpki_roots::TLS_SERVER_ROOTS;
const REDIRECT_CODES: &[StatusCode; 5] = &[
StatusCode::MOVED_PERMANENTLY,
StatusCode::FOUND,
StatusCode::SEE_OTHER,
StatusCode::TEMPORARY_REDIRECT,
StatusCode::PERMANENT_REDIRECT,
];
const DEFAULT_TIMEOUT: u64 = 5;
const DEFAULT_CACHE_DIR: &str = "cache/http";
const DEFAULT_HOSTNAME: &str = "localhost";
const DEFAULT_MAX_RETRY_DELAY: u64 = 30;
type DynBody = Pin<Box<dyn Body<Data = Bytes, Error = io::Error> + Send + Sync>>;
type SharedClients = DashMap<SharedClientKey, Arc<ClientType>>;
static CLIENTS: OnceLock<SharedClients> = OnceLock::new();
#[derive(Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct Curl {
#[serde(skip)]
document: Option<Box<dyn Document>>,
#[serde(rename = "metadata")]
#[serde(alias = "meta")]
pub metadata: Metadata,
#[serde(alias = "auth")]
#[serde(rename = "authenticator")]
pub authenticator_type: Option<Box<AuthenticatorType>>,
pub endpoint: String,
pub path: String,
#[serde(with = "method_uppercase")]
pub method: Method,
#[serde(with = "http_serde::header_map")]
pub headers: HeaderMap,
pub timeout: Option<u64>,
#[serde(alias = "params")]
pub parameters: Value,
#[serde(alias = "paginator")]
pub paginator_type: PaginatorType,
#[serde(alias = "counter")]
#[serde(alias = "count")]
pub counter_type: Option<CounterType>,
pub redirection_limit: u8,
#[serde(with = "http_version_serde")]
pub version: Version,
#[serde(alias = "cache")]
#[serde(alias = "cache_enabled")]
pub is_cached: bool,
#[serde(alias = "crt")]
pub certificate: Option<String>,
#[serde(alias = "retry")]
pub retry_policy: Option<RetryPolicy>,
#[serde(skip)]
#[serde(default)]
client: Option<Arc<ClientType>>,
}
mod http_version_serde {
use http::Version;
use serde::de::{self, Visitor};
use serde::{Deserializer, Serializer};
use std::fmt;
pub fn deserialize<'de, D>(deserializer: D) -> Result<Version, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_any(HttpVersionVisitor)
}
struct HttpVersionVisitor;
impl<'de> Visitor<'de> for HttpVersionVisitor {
type Value = Version;
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("an HTTP version as integer, float, or string")
}
fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E>
where
E: de::Error,
{
if !v.is_finite() {
return Err(E::custom("HTTP version must be finite"));
}
const EPS: f64 = 1e-9;
if (v - 1.0).abs() < EPS {
Ok(Version::HTTP_10)
} else if (v - 1.1).abs() < EPS {
Ok(Version::HTTP_11)
} else if (v - 2.0).abs() < EPS {
Ok(Version::HTTP_2)
} else if (v - 3.0).abs() < EPS {
Ok(Version::HTTP_3)
} else {
Err(E::custom("unsupported HTTP version"))
}
}
fn visit_f32<E>(self, v: f32) -> Result<Self::Value, E>
where
E: de::Error,
{
self.visit_f64(v as f64)
}
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where
E: de::Error,
{
match v {
1 => Ok(Version::HTTP_10),
2 => Ok(Version::HTTP_2),
3 => Ok(Version::HTTP_3),
_ => Err(E::custom("unsupported HTTP version")),
}
}
fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
where
E: de::Error,
{
if v < 0 {
return Err(E::custom("HTTP version cannot be negative"));
}
self.visit_u64(v as u64)
}
fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
match s {
"1" | "1.0" | "HTTP/1.0" => Ok(Version::HTTP_10),
"1.1" | "HTTP/1.1" => Ok(Version::HTTP_11),
"2" | "2.0" | "HTTP/2" | "HTTP/2.0" => Ok(Version::HTTP_2),
"3" | "3.0" | "HTTP/3" | "HTTP/3.0" => Ok(Version::HTTP_3),
_ => Err(E::custom("unsupported HTTP version")),
}
}
fn visit_string<E>(self, s: String) -> Result<Self::Value, E>
where
E: de::Error,
{
self.visit_str(&s)
}
}
pub fn serialize<S>(version: &Version, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let s = match *version {
Version::HTTP_10 => "1.0",
Version::HTTP_11 => "1.1",
Version::HTTP_2 => "2",
Version::HTTP_3 => "3",
_ => "unknown",
};
serializer.serialize_str(s)
}
}
mod method_uppercase {
use http::Method;
use serde::{Deserialize, Deserializer};
pub fn deserialize<'de, D>(deserializer: D) -> Result<Method, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
let upper = s.to_ascii_uppercase();
Method::from_bytes(upper.as_bytes()).map_err(serde::de::Error::custom)
}
pub fn serialize<S>(method: &Method, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(method.as_str())
}
}
impl Clone for Curl {
fn clone(&self) -> Self {
Self {
document: self.document.clone(),
metadata: self.metadata.clone(),
authenticator_type: self.authenticator_type.clone(),
endpoint: self.endpoint.clone(),
path: self.path.clone(),
method: self.method.clone(),
headers: self.headers.clone(),
timeout: self.timeout,
parameters: self.parameters.clone(),
paginator_type: self.paginator_type.clone(),
counter_type: self.counter_type.clone(),
redirection_limit: self.redirection_limit,
version: self.version,
is_cached: self.is_cached,
certificate: None,
retry_policy: self.retry_policy.clone(),
client: None,
}
}
}
impl fmt::Debug for Curl {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Curl")
.field("document", &self.document.display_only_for_debugging())
.field("metadata", &self.metadata.display_only_for_debugging())
.field(
"authenticator_type",
&self.authenticator_type.display_only_for_debugging(),
)
.field("endpoint", &self.endpoint.to_obfuscate())
.field("path", &self.path)
.field("method", &self.method)
.field("headers", &self.headers.display_only_for_debugging())
.field("timeout", &self.timeout)
.field("parameters", &self.parameters.display_only_for_debugging())
.field(
"paginator_type",
&self.paginator_type.display_only_for_debugging(),
)
.field(
"counter_type",
&self.counter_type.display_only_for_debugging(),
)
.field("redirection_limit", &self.redirection_limit)
.field("version", &self.version)
.field("is_cached", &self.is_cached)
.field("certificate", &self.certificate)
.finish()
}
}
impl Default for Curl {
fn default() -> Self {
Curl {
document: None,
metadata: Metadata::default(),
authenticator_type: None,
endpoint: "".into(),
path: "".into(),
method: Method::GET,
headers: HeaderMap::new(),
timeout: Some(DEFAULT_TIMEOUT),
parameters: Value::Null,
paginator_type: PaginatorType::default(),
counter_type: None,
redirection_limit: 5,
version: Version::default(),
is_cached: false,
certificate: None,
retry_policy: Some(RetryPolicy::default()),
client: None,
}
}
}
enum SmolStream {
Plain(TcpStream),
Tls(Box<RustlsTlsStream<TcpStream>>),
}
impl AsyncReadIo for SmolStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
match &mut *self {
SmolStream::Plain(stream) => Pin::new(stream).poll_read(cx, buf),
SmolStream::Tls(stream) => Pin::new(stream).poll_read(cx, buf),
}
}
}
impl AsyncWriteIo for SmolStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
match &mut *self {
SmolStream::Plain(stream) => Pin::new(stream).poll_write(cx, buf),
SmolStream::Tls(stream) => Pin::new(stream).poll_write(cx, buf),
}
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match &mut *self {
SmolStream::Plain(stream) => Pin::new(stream).poll_close(cx),
SmolStream::Tls(stream) => Pin::new(stream).poll_close(cx),
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match &mut *self {
SmolStream::Plain(stream) => Pin::new(stream).poll_flush(cx),
SmolStream::Tls(stream) => Pin::new(stream).poll_flush(cx),
}
}
}
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(default, deny_unknown_fields)]
pub struct RetryPolicy {
max_attempts: u32,
delay: Duration,
#[serde(
deserialize_with = "deserialize_status_codes",
serialize_with = "serialize_status_codes"
)]
retry_on_status: Vec<StatusCode>,
#[serde(
deserialize_with = "deserialize_methods",
serialize_with = "serialize_methods"
)]
retry_on_method: Vec<Method>,
}
fn deserialize_status_codes<'de, D>(
deserializer: D,
) -> std::result::Result<Vec<StatusCode>, D::Error>
where
D: serde::Deserializer<'de>,
{
let raw: Vec<u16> = Vec::deserialize(deserializer)?;
raw.into_iter()
.map(StatusCode::from_u16)
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(serde::de::Error::custom)
}
fn serialize_status_codes<S>(
codes: &[StatusCode],
serializer: S,
) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let raw: Vec<u16> = codes.iter().map(|c| c.as_u16()).collect();
raw.serialize(serializer)
}
fn deserialize_methods<'de, D>(deserializer: D) -> std::result::Result<Vec<Method>, D::Error>
where
D: serde::Deserializer<'de>,
{
let raw: Vec<String> = Vec::deserialize(deserializer)?;
raw.into_iter()
.map(|s| s.parse::<Method>().map_err(serde::de::Error::custom))
.collect()
}
fn serialize_methods<S>(methods: &[Method], serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let raw: Vec<&str> = methods.iter().map(|m| m.as_str()).collect();
raw.serialize(serializer)
}
impl Default for RetryPolicy {
fn default() -> Self {
Self {
max_attempts: 3,
delay: Duration::from_millis(200),
retry_on_status: vec![
StatusCode::REQUEST_TIMEOUT,
StatusCode::TOO_MANY_REQUESTS,
StatusCode::BAD_GATEWAY,
StatusCode::SERVICE_UNAVAILABLE,
StatusCode::GATEWAY_TIMEOUT,
],
retry_on_method: vec![
Method::GET,
Method::HEAD,
Method::PUT,
Method::DELETE,
Method::OPTIONS,
],
}
}
}
async fn backoff(attempt: u32, base_delay: Duration) {
let max_delay = Duration::from_secs(DEFAULT_MAX_RETRY_DELAY);
let mut delay = base_delay.saturating_mul(2u32.pow(attempt));
if delay > max_delay {
delay = max_delay;
}
let jitter = rand::rng().random_range(0.5..1.5);
delay = delay.mul_f64(jitter);
Timer::after(delay).await;
}
fn build_request(request_builder: Builder, body: &Bytes) -> io::Result<Request<DynBody>> {
Ok(match body.len() {
0 => request_builder
.body(
Box::pin(Empty::new().map_err(|e| Error::new(ErrorKind::InvalidData, e)))
as DynBody,
)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?,
_ => request_builder
.body(Box::pin(
Full::from(body.clone()).map_err(|e| Error::new(ErrorKind::InvalidData, e)),
) as DynBody)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?,
})
}
impl Curl {
#[instrument(name = "curl::client_mut")]
async fn client(&mut self) -> io::Result<Arc<ClientType>> {
if self.client.is_none() {
let client = get_or_create_client(
self.version,
self.endpoint.clone(),
self.timeout.unwrap_or(DEFAULT_TIMEOUT),
self.certificate.clone(),
)
.await?;
trace!("initialize the client in the connector");
self.client = Some(client);
}
Ok(self.client.as_ref().unwrap().clone())
}
async fn request_builder(
&mut self,
override_uri: Option<&str>,
override_method: Option<&Method>,
body: Option<&Bytes>,
) -> std::io::Result<Builder> {
let path = self.path();
let mut request_builder = Request::builder();
if path.has_mustache() {
return Err(Error::new(
ErrorKind::InvalidInput,
format!("This path '{}' is not fully resolved", path),
));
}
let uri = if let Some(uri) = override_uri {
uri.parse::<hyper::Uri>()
.with_context(|| format!("failed to parse URI: {}", uri))
} else {
format!("{}{}", self.endpoint, path)
.parse::<hyper::Uri>()
.with_context(|| format!("failed to parse URI: {}{}", self.endpoint, path))
}
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?;
let host = match uri.port_u16() {
Some(port) => format!("{}:{}", uri.host().unwrap_or(DEFAULT_HOSTNAME), port),
None => uri.host().unwrap_or(DEFAULT_HOSTNAME).to_string(),
};
let method = if let Some(method) = override_method {
method.clone()
} else {
self.method.clone()
};
request_builder = request_builder.uri(uri).method(method);
request_builder = match self.version {
Version::HTTP_10 | Version::HTTP_11 => request_builder
.header(header::HOST, host)
.version(Version::HTTP_11),
Version::HTTP_2 => request_builder
.version(Version::HTTP_2)
.header(header::HOST, host),
_ => {
return Err(Error::new(
ErrorKind::InvalidInput,
format!("This http version '{:?}' is not managed", self.version),
))
}
};
let content_type = self.metadata().content_type();
let body_length = if let Some(bytes) = body {
bytes.len()
} else {
0
};
if !content_type.is_empty() && body_length > 0 {
request_builder = request_builder.header(
header::CONTENT_TYPE,
HeaderValue::from_str(&content_type)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?,
);
request_builder = request_builder.header(header::CONTENT_LENGTH, body_length);
}
for (header_name, header_value) in self.headers.iter() {
request_builder = request_builder.header(header_name, header_value);
}
if let Some(authenticator_type) = self.authenticator_type.clone() {
let authenticator = authenticator_type.authenticator();
let (auth_name, auth_value) = authenticator.authenticate().await?;
request_builder = request_builder.header(
HeaderName::from_bytes(&auth_name)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?,
HeaderValue::from_bytes(&auth_value)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?,
);
}
Ok(request_builder)
}
#[instrument(name = "curl::headers")]
pub async fn headers(&mut self) -> std::io::Result<Vec<(String, Vec<u8>)>> {
let mut parameters_without_context = self.parameters_without_context()?;
parameters_without_context.replace_mustache(self.parameters.clone());
let dataset = vec![DataResult::Ok(parameters_without_context)];
let body = self.body(&dataset).await?;
let request_builder = self.request_builder(None, None, Some(&body)).await?;
let request = build_request(request_builder, &body)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?;
if self.is_cached {
if let Ok(Some(cache_entry)) = CachedEntry::get(&request).await {
info!("Fetch headers from cache with success");
return Ok(cache_entry
.resp_headers
.iter()
.map(|(key, value)| (key.to_string(), value.as_bytes().to_vec()))
.collect());
}
}
let request_builder = self.request_builder(None, None, Some(&body)).await?;
let entry_to_cache = self.follow_redirects(request_builder, &body).await?;
let status = StatusCode::from_u16(entry_to_cache.status)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?;
if !status.is_success() {
error!(
status = %entry_to_cache.status,
uri = %entry_to_cache.uri,
body = %String::from_utf8_lossy(&entry_to_cache.data).display_only_for_debugging(),
"HTTP error"
);
return Err(std::io::Error::other(format!(
"HTTP error '{}' for request '{}'",
StatusCode::from_u16(entry_to_cache.status)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?,
entry_to_cache.uri
)));
}
info!("Fetch headers with success");
Ok(entry_to_cache
.resp_headers
.iter()
.map(|(key, value)| (key.to_string().clone(), value.as_bytes().to_vec()))
.collect())
}
fn parameters_without_context(&self) -> Result<Value> {
Ok(match self.parameters.clone().search("/input")? {
Some(input) => input,
None => self.parameters.clone(),
})
}
async fn body(&self, dataset: &Vec<DataResult>) -> Result<Bytes> {
match self.method {
Method::POST | Method::PUT | Method::PATCH => {
let mut buffer = Vec::default();
let mut document = self.document()?.clone_box();
document.set_entry_path(String::default());
buffer.write_all(&document.header(dataset)?).await?;
buffer.write_all(&document.write(dataset)?).await?;
buffer.write_all(&document.footer(dataset)?).await?;
if document.metadata().mime_subtype.as_deref() == Some("x-www-form-urlencoded") {
if buffer.starts_with(b"\"") {
buffer.drain(0..1);
}
if buffer.ends_with(b"\"") {
buffer.pop();
}
}
Ok(Bytes::from(buffer))
}
_ => Ok(Bytes::new()),
}
}
#[instrument(name = "curl::follow_redirects")]
async fn follow_redirects(
&mut self,
request_builder: Builder,
original_bytes: &Bytes,
) -> io::Result<CachedEntry> {
let request_builder = request_builder;
let (endpoint, mut current_uri) = match request_builder.uri_ref() {
Some(uri) => (
format!(
"{}://{}",
uri.scheme().unwrap_or(&Scheme::HTTP),
uri.authority()
.unwrap_or(&Authority::from_static(DEFAULT_HOSTNAME))
),
uri.to_string(),
),
None => return Err(Error::new(ErrorKind::InvalidInput, "Uri is required")),
};
let mut current_method = match request_builder.method_ref() {
Some(method) => method.clone(),
None => return Err(Error::new(ErrorKind::InvalidInput, "Method is required")),
};
let mut bytes = original_bytes.clone();
for _ in 0..=self.redirection_limit {
let entry_to_cache = self
.send_with_retry(¤t_uri, ¤t_method, &bytes)
.await?;
if !REDIRECT_CODES.contains(
&StatusCode::from_u16(entry_to_cache.status)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?,
) {
return Ok(entry_to_cache);
}
let location = entry_to_cache
.resp_headers
.get(header::LOCATION.as_str())
.ok_or_else(|| Error::new(ErrorKind::InvalidData, "Missing Location header"))?;
current_uri = if location.to_string().starts_with("/") {
format!("{}{}", endpoint, location)
} else {
location.to_string()
};
info!(%current_uri, %location, "Redirecting");
match StatusCode::from_u16(entry_to_cache.status)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?
{
hyper::StatusCode::SEE_OTHER
| hyper::StatusCode::MOVED_PERMANENTLY
| hyper::StatusCode::FOUND => {
if current_method != Method::HEAD || current_method != Method::GET {
current_method = Method::GET;
bytes = Bytes::new();
}
}
hyper::StatusCode::TEMPORARY_REDIRECT | hyper::StatusCode::PERMANENT_REDIRECT => {
}
_ => {}
}
}
Err(Error::new(ErrorKind::InvalidInput, "too many redirects"))
}
async fn send_with_retry(
&mut self,
uri: &str,
method: &Method,
body: &Bytes,
) -> io::Result<CachedEntry> {
let retry_policy = match &self.retry_policy {
Some(p) => p.clone(),
None => RetryPolicy::default(),
};
let max_attempts = retry_policy.max_attempts;
let delay = retry_policy.delay;
let retry_on_status = retry_policy.retry_on_status.clone();
let retry_on_method = retry_policy.retry_on_method.clone();
for attempt in 1..=max_attempts {
let request_builder = self
.request_builder(Some(uri), Some(method), Some(body))
.await?;
let client = self.client().await?;
let req_headers = match request_builder.headers_ref() {
Some(headers) => headers_to_map(headers),
None => HashMap::default(),
};
let result = match &*client {
ClientType::Http1(sender_mutex) => {
let mut sender = sender_mutex.lock().await;
sender
.send_request(build_request(request_builder, body)?)
.await
}
ClientType::Http2(sender) => {
sender
.clone()
.send_request(build_request(request_builder, body)?)
.await
}
};
match result {
Ok(response) => {
if retry_on_status.contains(&response.status())
&& retry_on_method.contains(method)
&& attempt < max_attempts
{
backoff(attempt, delay).await;
continue;
}
let resp_status = response.status().as_u16();
let resp_headers = headers_to_map(response.headers());
let data = response
.collect()
.await
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?
.to_bytes()
.to_vec();
return Ok(CachedEntry::new(
resp_status,
method.to_string(),
uri.to_string(),
req_headers,
resp_headers,
data,
));
}
Err(e) => {
let retryable = e.is_closed() || e.is_incomplete_message() || e.is_timeout();
if retryable && attempt < max_attempts && retry_on_method.contains(method) {
warn!(attempt, "Retrying request after transport error: {}", e);
self.client = None;
backoff(attempt, delay).await;
continue;
}
return Err(Error::new(ErrorKind::Interrupted, e));
}
}
}
Err(Error::other("retry limit exceeded"))
}
}
async fn get_or_create_client(
version: Version,
endpoint: String,
timeout: u64,
has_certificate: Option<String>,
) -> Result<Arc<ClientType>> {
match version {
Version::HTTP_2 => {
let clients = CLIENTS.get_or_init(DashMap::new);
let key = SharedClientKey::new(version, endpoint.clone());
if let Some(existing) = clients.get(&key) {
trace!(key = ?key, "reuse client store in shared container");
return Ok(existing.clone());
}
let client = Arc::new(ClientType::Http2(
http2(endpoint.clone(), timeout, has_certificate.clone()).await?,
));
trace!(key = ?key, "create and storing new client in shared container");
let client = clients
.entry(key)
.or_insert_with(|| client.clone())
.value()
.clone();
Ok(client)
}
Version::HTTP_10 | Version::HTTP_11 => {
trace!("create new client without shared container");
Ok(Arc::new(ClientType::Http1(Arc::new(Mutex::new(
http1(endpoint, timeout, has_certificate).await?,
)))))
}
_ => panic!("Unsupported {:?}", version),
}
}
#[async_trait]
impl Connector for Curl {
fn set_document(&mut self, document: Box<dyn Document>) -> Result<()> {
self.document = Some(document.clone());
Ok(())
}
fn document(&self) -> Result<&dyn Document> {
self.document.as_deref().ok_or_else(|| {
Error::new(
ErrorKind::InvalidInput,
"The document has not been set in the connector",
)
})
}
fn path(&self) -> String {
let mut path = self.path.clone();
match self.is_variable() {
true => {
let mut params = self.parameters.clone();
let mut metadata = Map::default();
metadata.insert("metadata".to_string(), self.metadata().into());
params.merge(&Value::Object(metadata));
path.replace_mustache(params.clone());
path
}
false => path,
}
}
fn is_resource_will_change(&self, new_parameters: Value) -> Result<bool> {
if !self.is_variable() {
trace!("Stay link to the same resource");
return Ok(false);
}
let mut metadata_kv = Map::default();
metadata_kv.insert("metadata".to_string(), self.metadata().into());
let metadata = Value::Object(metadata_kv);
let mut new_parameters = new_parameters;
new_parameters.merge(&metadata);
let mut old_parameters = self.parameters.clone();
old_parameters.merge(&metadata);
let mut previous_path = self.path.clone();
previous_path.replace_mustache(old_parameters);
let mut new_path = self.path.clone();
new_path.replace_mustache(new_parameters);
if previous_path == new_path {
trace!(path = previous_path, "The path has not changed");
return Ok(false);
}
info!(
previous_path = previous_path,
new_path = new_path,
"Will use another resource based the new parameters"
);
Ok(true)
}
fn is_variable(&self) -> bool {
self.path.has_mustache()
}
fn set_parameters(&mut self, parameters: Value) {
self.parameters = parameters;
}
fn metadata(&self) -> Metadata {
match &self.document {
Some(document) => self.metadata.clone().merge(&document.metadata()),
None => self.metadata.clone(),
}
}
#[instrument(name = "curl::len")]
async fn len(&self) -> Result<usize> {
let counter_type = match &self.counter_type {
Some(counter_type) => counter_type,
None => return Ok(0),
};
match counter_type.count(self).await {
Ok(Some(count)) => Ok(count),
Ok(None) => Ok(0),
Err(e) => {
warn!(
error = e.to_string(),
"Can't count the number of element, return 0"
);
Ok(0)
}
}
}
#[instrument(name = "curl::fetch", skip(self))]
async fn fetch(&mut self) -> std::io::Result<Option<DataStream>> {
let mut parameters_without_context = self.parameters_without_context()?;
parameters_without_context.replace_mustache(self.parameters.clone());
let dataset = vec![DataResult::Ok(parameters_without_context)];
let body = self.body(&dataset).await?;
let request_builder = self.request_builder(None, None, Some(&body)).await?;
let request = build_request(request_builder, &body)?;
if self.is_cached {
if let Ok(Some(cache_entry)) = CachedEntry::get(&request).await {
let document = self.document()?;
let dataset = document.read(&cache_entry.data)?;
info!("Fetch data from cache with success");
return Ok(Some(Box::pin(stream! {
for data in dataset {
yield data;
}
})));
}
}
let request_builder = self.request_builder(None, None, Some(&body)).await?;
let mut entry_to_cache = self.follow_redirects(request_builder, &body).await?;
entry_to_cache.method = self.method.to_string();
let status = StatusCode::from_u16(entry_to_cache.status)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?;
if self.is_cached && status.is_success() {
entry_to_cache.save().await?;
}
if !status.is_success() {
error!(
status = %entry_to_cache.status,
uri = %entry_to_cache.uri,
body = %String::from_utf8_lossy(&entry_to_cache.data).display_only_for_debugging(),
"HTTP error"
);
return Err(std::io::Error::other(format!(
"HTTP error '{}' for request '{}'",
StatusCode::from_u16(entry_to_cache.status)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?,
entry_to_cache.uri
)));
}
let data = entry_to_cache.data;
info!("Fetch data with success");
let document = self.document()?;
if !document.has_data(&data)? {
return Ok(None);
}
let dataset = document.read(&data)?;
Ok(Some(Box::pin(stream! {
for data in dataset {
yield data;
}
})))
}
#[instrument(name = "curl::send")]
async fn send(&mut self, dataset: &DataSet) -> std::io::Result<Option<DataStream>> {
let body = self.body(dataset).await?;
let request_builder = self.request_builder(None, None, Some(&body)).await?;
let entry_to_cache = self.follow_redirects(request_builder, &body).await?;
let status = StatusCode::from_u16(entry_to_cache.status)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?;
if !status.is_success() {
error!(
status = %entry_to_cache.status,
uri = %entry_to_cache.uri,
body = %String::from_utf8_lossy(&entry_to_cache.data).display_only_for_debugging(),
"HTTP error"
);
return Err(std::io::Error::other(format!(
"HTTP error '{}' for request '{}'",
StatusCode::from_u16(entry_to_cache.status)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?,
entry_to_cache.uri
)));
}
let data = entry_to_cache.data;
info!("Send data with success");
let document = self.document()?;
if !document.has_data(&data)? {
return Ok(None);
}
let dataset = document.read(&data)?;
Ok(Some(Box::pin(stream! {
for data in dataset {
yield data;
}
})))
}
#[instrument(name = "curl::erase")]
async fn erase(&mut self) -> Result<()> {
let path = self.path();
let body = self.body(&Vec::default()).await?;
let request_builder = self
.request_builder(None, Some(&Method::DELETE), Some(&body))
.await?;
let entry_to_cache = self.follow_redirects(request_builder, &body).await?;
let status = StatusCode::from_u16(entry_to_cache.status)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?;
if !status.is_success() {
error!(
status = %entry_to_cache.status,
uri = %entry_to_cache.uri,
body = %String::from_utf8_lossy(&entry_to_cache.data).display_only_for_debugging(),
"HTTP error"
);
return Err(std::io::Error::other(format!(
"HTTP error '{}' for request '{}'",
StatusCode::from_u16(entry_to_cache.status)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?,
entry_to_cache.uri
)));
}
if self.is_cached {
entry_to_cache.remove().await?;
info!("Erase cache entry with success");
}
info!(path, "Erase data with success");
Ok(())
}
async fn paginate(
&self,
) -> Result<Pin<Box<dyn Stream<Item = Result<Box<dyn Connector>>> + Send>>> {
self.paginator_type.paginate(self).await
}
}
#[derive(Serialize, Deserialize)]
struct CachedEntry {
pub status: u16,
pub method: String,
pub uri: String,
pub req_headers: HashMap<String, String>,
pub resp_headers: HashMap<String, String>,
pub data: Vec<u8>,
}
impl fmt::Debug for CachedEntry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CachedEntry")
.field("status", &self.status)
.field("method", &self.method)
.field("uri", &self.uri)
.field("req_headers", &self.req_headers)
.field("resp_headers", &self.resp_headers)
.field("data", &self.data.display_only_for_debugging())
.finish()
}
}
impl CachedEntry {
fn new(
status: u16,
method: String,
uri: String,
req_headers: HashMap<String, String>,
resp_headers: HashMap<String, String>,
data: Vec<u8>,
) -> Self {
Self {
status,
method,
uri,
req_headers,
resp_headers,
data,
}
}
#[instrument(name = "curl::cache_entry::save")]
async fn save(&self) -> Result<()> {
let cache_dir = std::env::temp_dir().join(self::DEFAULT_CACHE_DIR);
let payload = serde_json::to_vec(&self)?;
cacache::write(cache_dir, &self.uri, payload)
.await
.map_err(|e| Error::new(ErrorKind::Interrupted, e))?;
trace!(uri = self.uri, "cache saved");
Ok(())
}
#[instrument(name = "curl::cache_entry::get", skip(request))]
async fn get(request: &Request<DynBody>) -> Result<Option<Self>> {
let uri = request.uri().to_string();
let cache_dir = std::env::temp_dir().join(self::DEFAULT_CACHE_DIR);
let data = match cacache::read(cache_dir, &uri).await {
Ok(data) => data,
Err(e) => {
trace!(uri, "cache miss: {}", e);
return Ok(None);
}
};
let cached: Self = serde_json::from_slice(&data)?;
let mut cache_req_builder = Request::builder().method(cached.method.as_str()).uri(&uri);
for (k, v) in cached.req_headers.iter() {
cache_req_builder = cache_req_builder.header(k, v);
}
let cache_req = cache_req_builder
.body(())
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?;
let mut cache_resp_builder = Response::builder().status(cached.status);
for (k, v) in cached.resp_headers.iter() {
cache_resp_builder = cache_resp_builder.header(k, v);
}
let cache_resp = cache_resp_builder
.body(())
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?;
let policy = CachePolicy::new(&cache_req, &cache_resp);
match policy.before_request(request, SystemTime::now()) {
BeforeRequest::Fresh(_) => {
trace!(uri, entry = format!("{:?}", cached), "cache hit");
Ok(Some(cached))
}
BeforeRequest::Stale { .. } => {
trace!(uri, "cache stale");
Ok(None)
}
}
}
#[instrument(name = "curl::cache_entry::remove")]
async fn remove(&self) -> Result<()> {
let cache_dir = std::env::temp_dir().join(self::DEFAULT_CACHE_DIR);
cacache::remove(cache_dir, &self.uri)
.await
.map_err(|e| Error::new(ErrorKind::Interrupted, e))?;
trace!(uri = self.uri, "cache removed");
Ok(())
}
}
fn headers_to_map(headers: &HeaderMap) -> HashMap<String, String> {
headers
.iter()
.filter_map(|(k, v)| Some((k.to_string(), v.to_str().ok()?.to_string())))
.collect()
}
pub enum ClientType {
Http1(Arc<Mutex<SendRequestHttp1<DynBody>>>),
Http2(SendRequestHttp2<DynBody>),
}
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
struct SharedClientKey {
version: Version,
endpoint: String,
}
impl SharedClientKey {
fn new(version: Version, endpoint: String) -> Self {
Self { version, endpoint }
}
}
async fn http1(
endpoint: String,
timeout: u64,
has_certificate: Option<String>,
) -> io::Result<SendRequestHttp1<DynBody>> {
use hyper::client::conn::http1;
let base = endpoint
.parse::<hyper::Uri>()
.map_err(|e| io::Error::new(ErrorKind::InvalidData, e))?;
let scheme: Scheme = base
.scheme_str()
.ok_or_else(|| io::Error::new(ErrorKind::InvalidInput, "missing scheme"))?
.try_into()
.map_err(|_| io::Error::new(ErrorKind::InvalidInput, "unsupported scheme"))?;
let host: String = base
.host()
.ok_or_else(|| io::Error::new(ErrorKind::InvalidInput, "missing host"))?
.to_owned();
let port = base.port_u16().unwrap_or(if scheme == Scheme::HTTP {
80
} else if scheme == Scheme::HTTPS {
443
} else {
return Err(io::Error::new(ErrorKind::InvalidInput, "unsupported port"));
});
let tcp = match TcpStream::connect((host.clone(), port))
.timeout(Duration::from_secs(timeout))
.await
{
None => return Err(io::Error::new(ErrorKind::TimedOut, "connect timeout")),
Some(Err(e)) => return Err(e),
Some(Ok(tcp)) => tcp,
};
tcp.set_nodelay(true)?;
let stream = if scheme == Scheme::HTTP {
SmolStream::Plain(tcp)
} else if scheme == Scheme::HTTPS {
let mut roots = RootCertStore::empty();
roots.extend(TLS_SERVER_ROOTS.iter().cloned());
if let Some(certificate_path) = has_certificate {
let iter = CertificateDer::pem_file_iter(certificate_path)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
let certs: Vec<CertificateDer<'_>> = iter.filter_map(|res| res.ok()).collect();
roots.add_parsable_certificates(certs.into_iter());
}
let mut config = ClientConfig::builder()
.with_root_certificates(roots)
.with_no_client_auth();
config.alpn_protocols.clear();
let connector = TlsConnector::from(Arc::new(config));
let server_name = rustls::pki_types::ServerName::try_from(host)
.map_err(|_| io::Error::new(ErrorKind::InvalidInput, "invalid DNS name"))?;
let tls = connector.connect(server_name, tcp).await?;
SmolStream::Tls(Box::new(futures_rustls::TlsStream::Client(tls)))
} else {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"unsupported scheme",
));
};
let (sender, connection): (
SendRequestHttp1<DynBody>,
ConnectionHttp1<FuturesIo<SmolStream>, DynBody>,
) = http1::Builder::new()
.title_case_headers(false)
.handshake(FuturesIo::new(stream))
.await
.map_err(|e| io::Error::new(ErrorKind::ConnectionAborted, e))?;
smol::spawn(async move {
debug!("HTTP/1 connection task started");
if let Err(e) = connection.await {
warn!(error = %e, "HTTP/1 connection closed");
}
})
.detach();
Ok(sender)
}
async fn http2(
endpoint: String,
timeout: u64,
has_certificate: Option<String>,
) -> io::Result<SendRequestHttp2<DynBody>> {
let base = endpoint
.parse::<hyper::Uri>()
.map_err(|e| io::Error::new(ErrorKind::InvalidData, e))?;
let scheme: Scheme = base
.scheme_str()
.ok_or_else(|| io::Error::new(ErrorKind::InvalidInput, "missing scheme"))?
.try_into()
.map_err(|_| io::Error::new(ErrorKind::InvalidInput, "unsupported scheme"))?;
let host: String = base
.host()
.ok_or_else(|| io::Error::new(ErrorKind::InvalidInput, "missing host"))?
.to_owned();
let port = base.port_u16().unwrap_or(if scheme == Scheme::HTTP {
80
} else if scheme == Scheme::HTTPS {
443
} else {
return Err(io::Error::new(ErrorKind::InvalidInput, "unsupported port"));
});
let tcp = match TcpStream::connect((host.clone(), port))
.timeout(Duration::from_secs(timeout))
.await
{
None => return Err(io::Error::new(ErrorKind::TimedOut, "connect timeout")),
Some(Err(e)) => return Err(e),
Some(Ok(tcp)) => tcp,
};
let mut roots = RootCertStore::empty();
roots.extend(TLS_SERVER_ROOTS.iter().cloned());
if let Some(certificate_path) = has_certificate {
let iter = CertificateDer::pem_file_iter(certificate_path)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
let certs: Vec<CertificateDer<'_>> = iter.filter_map(|res| res.ok()).collect();
roots.add_parsable_certificates(certs.into_iter());
}
let mut config = ClientConfig::builder()
.with_root_certificates(roots)
.with_no_client_auth();
config.alpn_protocols = vec![b"h2".to_vec()];
let connector = TlsConnector::from(Arc::new(config));
let server_name = rustls::pki_types::ServerName::try_from(host.clone())
.map_err(|_| io::Error::new(ErrorKind::InvalidInput, "invalid DNS name"))?;
let tls = connector.connect(server_name, tcp).await?;
debug_assert_eq!(tls.get_ref().1.alpn_protocol(), Some(b"h2".as_slice()));
let io = FuturesIo::new(tls);
let exec = Arc::new(Executor::new());
smol::spawn({
let exec = exec.clone();
async move {
exec.run(futures::future::pending::<()>()).await;
}
})
.detach();
let executor = SmolExecutor::new(exec);
let (sender, connection) = hyper::client::conn::http2::Builder::new(executor)
.handshake(io)
.await
.map_err(|e| io::Error::new(ErrorKind::ConnectionAborted, e))?;
smol::spawn(async move {
debug!("HTTP/2 connection task started");
if let Err(e) = connection.await {
warn!(error = %e, "HTTP/2 connection closed");
}
})
.detach();
Ok(sender)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::connector::authenticator::{basic::Basic, bearer::Bearer, AuthenticatorType};
use crate::connector::counter::curl::CounterType;
use crate::document::json::Json;
use json_value_search::Search;
use macro_rules_attribute::apply;
use smol::stream::StreamExt;
use smol_macros::test;
#[test]
fn is_variable() {
let mut connector = Curl::default();
assert_eq!(false, connector.is_variable());
let params: Value = serde_json::from_str(r#"{"field":"value"}"#).unwrap();
connector.set_parameters(params);
connector.path = "/get/{{ field }}".to_string();
assert_eq!(true, connector.is_variable());
}
#[test]
fn is_resource_will_change() {
let mut connector = Curl::default();
let params = serde_json::from_str(r#"{"field":"test"}"#).unwrap();
assert_eq!(
false,
connector.is_resource_will_change(Value::Null).unwrap()
);
connector.path = "/dir/static.ext".to_string();
assert_eq!(
false,
connector.is_resource_will_change(Value::Null).unwrap()
);
connector.path = "/dir/dynamic_{{ field }}.ext".to_string();
assert_eq!(true, connector.is_resource_will_change(params).unwrap());
}
#[test]
fn path() {
let mut connector = Curl::default();
connector.path = "/resource/{{ field }}".to_string();
let params: Value = serde_json::from_str(r#"{"field":"value"}"#).unwrap();
connector.set_parameters(params);
assert_eq!("/resource/value", connector.path());
}
#[apply(test!)]
async fn len() {
let mut connector = Curl::default();
connector.endpoint = "http://localhost:8080".to_string();
connector.path = "/status/200".to_string();
connector.counter_type = Some(CounterType::default());
assert!(
0 == connector.len().await.unwrap(),
"The remote document should have a length equal to zero."
);
connector.path = "/get".to_string();
assert!(
0 != connector.len().await.unwrap(),
"The remote document should have a length different than zero."
);
}
#[apply(test!)]
async fn is_empty() {
let mut connector = Curl::default();
connector.endpoint = "http://localhost:8080".to_string();
connector.path = "/status/200".to_string();
connector.counter_type = Some(CounterType::default());
assert_eq!(true, connector.is_empty().await.unwrap());
connector.path = "/get".to_string();
assert_eq!(false, connector.is_empty().await.unwrap());
}
#[apply(test!)]
async fn http1_fetch() {
let document = Json::default();
let mut connector = Curl::default();
connector.endpoint = "http://localhost:8080".to_string();
connector.method = Method::GET;
connector.path = "/json".to_string();
connector.version = Version::default();
connector.set_document(Box::new(document)).unwrap();
let datastream = connector.fetch().await.unwrap().unwrap();
assert!(
0 < datastream.count().await,
"The inner connector should have a size upper than zero."
);
}
#[apply(test!)]
async fn http1_fetch_through_https() {
crate::init_tls().await.unwrap();
let document = Json::default();
let mut connector = Curl::default();
connector.endpoint = "https://localhost:8084".to_string();
connector.method = Method::GET;
connector.path = "/json".to_string();
connector.certificate = Some("./.config/my-ca.crt".to_string());
connector.version = Version::default();
connector.set_document(Box::new(document)).unwrap();
let datastream = connector.fetch().await.unwrap().unwrap();
assert!(
0 < datastream.count().await,
"The inner connector should have a size upper than zero."
);
}
#[apply(test!)]
async fn http2_fetch_through_https() {
crate::init_tls().await.unwrap();
let document = Json::default();
let mut connector = Curl::default();
connector.endpoint = "https://localhost:8084".to_string();
connector.method = Method::GET;
connector.path = "/json".to_string();
connector.certificate = Some("./.config/my-ca.crt".to_string());
connector.version = Version::HTTP_2;
connector.set_document(Box::new(document)).unwrap();
let datastream = connector.fetch().await.unwrap().unwrap();
assert!(
0 < datastream.count().await,
"The inner connector should have a size upper than zero."
);
}
#[apply(test!)]
async fn fetch_head() {
let document = Json::default();
let mut connector = Curl::default();
connector.endpoint = "http://localhost:8080".to_string();
connector.method = Method::HEAD;
connector.path = "/get".to_string();
connector.is_cached = false;
connector.set_document(Box::new(document)).unwrap();
assert!(
connector.fetch().await.unwrap().is_none(),
"The inner connector should have a size upper than zero."
);
}
#[apply(test!)]
async fn fetch_with_basic() {
let document = Json::default();
let mut connector = Curl::default();
connector.endpoint = "http://localhost:8080".to_string();
connector.method = Method::GET;
connector.path = "/basic-auth/my-username/my-password".to_string();
connector.authenticator_type = Some(Box::new(AuthenticatorType::Basic(Basic::new(
"my-username",
"my-password",
))));
connector.set_document(Box::new(document)).unwrap();
let datastream = connector.fetch().await.unwrap().unwrap();
assert!(
0 < datastream.count().await,
"The inner connector should have a size upper than zero."
);
}
#[apply(test!)]
async fn fetch_with_bearer() {
let document = Json::default();
let mut connector = Curl::default();
connector.endpoint = "http://localhost:8080".to_string();
connector.method = Method::GET;
connector.path = "/bearer".to_string();
connector.authenticator_type =
Some(Box::new(AuthenticatorType::Bearer(Bearer::new("abcd1234"))));
connector.set_document(Box::new(document)).unwrap();
let datastream = connector.fetch().await.unwrap().unwrap();
assert!(
0 < datastream.count().await,
"The inner connector should have a size upper than zero."
);
}
#[apply(test!)]
async fn send() {
let document = Json::default();
let mut connector = Curl::default();
connector.endpoint = "http://localhost:8080".to_string();
connector.method = Method::POST;
connector.path = "/post".to_string();
let expected_result1 =
DataResult::Ok(serde_json::from_str(r#"{"column1":"value1"}"#).unwrap());
connector.set_document(Box::new(document)).unwrap();
let dataset = vec![expected_result1];
let mut datastream = connector.send(&dataset).await.unwrap().unwrap();
let value = datastream.next().await.unwrap().to_value();
assert_eq!(
r#"[{"column1":"value1"}]"#,
value.search("/data").unwrap().unwrap()
);
}
#[apply(test!)]
async fn erase() {
let mut connector = Curl::default();
connector.endpoint = "http://localhost:8080".to_string();
connector.path = "/status/200".to_string();
connector.erase().await.unwrap();
assert_eq!(true, connector.is_empty().await.unwrap());
}
#[apply(test!)]
async fn test_redirection_with_fetch() {
let document = Json::default();
let mut connector = Curl::default();
connector.endpoint = "http://localhost:8080".to_string();
connector.path = "/redirect/1".to_string();
connector.redirection_limit = 1;
connector.set_document(Box::new(document)).unwrap();
let datastream = connector.fetch().await.unwrap().unwrap();
assert!(
0 < datastream.count().await,
"The inner connector should have a size upper than zero."
);
connector.path = "/redirect/2".to_string();
connector.redirection_limit = 1;
let result = connector.fetch().await;
assert!(
result.is_err(),
"The inner connector should raise an error."
);
}
#[apply(test!)]
async fn test_redirection_with_send() {
let document = Json::default();
let expected_result1 =
DataResult::Ok(serde_json::from_str(r#"{"column1":"value1"}"#).unwrap());
let dataset = vec![expected_result1];
let mut connector = Curl::default();
connector.endpoint = "http://localhost:8080".to_string();
connector.path = "/redirect/1".to_string();
connector.redirection_limit = 1;
connector.set_document(Box::new(document)).unwrap();
let datastream = connector.send(&dataset).await.unwrap().unwrap();
assert!(
0 < datastream.count().await,
"The inner connector should have a size upper than zero."
);
connector.path = "/redirect/2".to_string();
connector.redirection_limit = 1;
let result = connector.send(&dataset).await;
assert!(
result.is_err(),
"The inner connector should raise an error."
);
}
#[apply(test!)]
async fn test_redirection_with_erase() {
let mut connector = Curl::default();
connector.endpoint = "http://localhost:8080".to_string();
connector.path = "/redirect-to?url=/delete&status_code=307".to_string();
connector.redirection_limit = 1;
let result = connector.erase().await;
assert!(
result.is_ok(),
"The inner connector shouldn't raise this error: {:?}",
result
);
}
#[apply(test!)]
async fn test_retry_timeout() {
let mut connector = Curl::default();
connector.endpoint = "http://localhost:8080".to_string();
connector.path = format!("/status/{}", http::StatusCode::GATEWAY_TIMEOUT.as_u16());
let result = connector.fetch().await;
assert!(
result.is_err(),
"The inner connector should raise a Timeout error"
);
}
}