pub use fastly_shared::{CacheOverride, FastlyRequestMetadata};
use crate::abi;
use crate::backend::validate_backend;
use crate::body::{Body, BodyHandle};
use crate::error::{anyhow, ensure, Error};
use crate::response::{handles_to_response, ResponseHandle};
use bytes::{BufMut, BytesMut};
use http::header::{HeaderName, HeaderValue};
use http::{Extensions, Method, Request, Response, Uri, Version};
use lazy_static::lazy_static;
use std::convert::{TryFrom, TryInto};
use std::io::Write;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::sync::Mutex;
#[derive(Debug, Eq, Hash, PartialEq)]
#[repr(transparent)]
pub struct RequestHandle {
handle: u32,
}
lazy_static! {
pub(crate) static ref GOT_DOWNSTREAM: Mutex<bool> = Mutex::new(false);
}
impl RequestHandle {
pub const INVALID: Self = RequestHandle {
handle: fastly_shared::INVALID_REQUEST_HANDLE,
};
pub fn is_invalid(&self) -> bool {
self == &Self::INVALID
}
pub(crate) unsafe fn handle(&self) -> Self {
Self {
handle: self.handle,
}
}
pub fn downstream() -> Result<Self, Error> {
let mut got = GOT_DOWNSTREAM.lock().unwrap();
if *got {
return Err(Error::msg(
"cannot get more than one handle to the downstream request per execution",
));
}
let mut handle = RequestHandle::INVALID;
let status = unsafe { abi::xqd_req_body_downstream_get(&mut handle, std::ptr::null_mut()) };
if status.is_err() || handle.is_invalid() {
Err(Error::msg("xqd_req_body_downstream_get failed"))
} else {
*got = true;
Ok(handle)
}
}
pub fn new() -> Result<Self, Error> {
let mut handle = RequestHandle::INVALID;
let status = unsafe { abi::xqd_req_new(&mut handle) };
if status.is_err() || handle.is_invalid() {
Err(Error::msg("xqd_req_new failed"))
} else {
Ok(handle)
}
}
pub fn get_header_names<'a>(
&'a self,
buf_size: usize,
) -> impl Iterator<Item = Result<HeaderName, Error>> + 'a {
abi::MultiValueHostcall::new(
b'\0',
buf_size,
move |buf, buf_size, cursor, ending_cursor, nwritten| unsafe {
abi::xqd_req_header_names_get(
self.handle(),
buf,
buf_size,
cursor,
ending_cursor,
nwritten,
)
},
)
.map(|res| {
res.and_then(|name_bytes| {
HeaderName::from_bytes(&name_bytes)
.map_err(|e| anyhow!("invalid header name: {}", e))
})
})
}
pub fn get_header_values<'a>(
&'a self,
name: &'a HeaderName,
buf_size: usize,
) -> impl Iterator<Item = Result<HeaderValue, Error>> + 'a {
abi::MultiValueHostcall::new(
b'\0',
buf_size,
move |buf, buf_size, cursor, ending_cursor, nwritten| unsafe {
let name: &[u8] = name.as_ref();
abi::xqd_req_header_values_get(
self.handle(),
name.as_ptr(),
name.len(),
buf,
buf_size,
cursor,
ending_cursor,
nwritten,
)
},
)
.map(|res| {
res.map(|value_bytes| unsafe {
HeaderValue::from_maybe_shared_unchecked(value_bytes)
})
})
}
pub fn set_header_values<'a, I>(&mut self, name: &HeaderName, values: I) -> Result<(), Error>
where
I: IntoIterator<Item = &'a HeaderValue>,
{
let mut buf = vec![];
for value in values {
buf.put(value.as_bytes());
buf.put_u8(b'\0');
}
let name: &[u8] = name.as_ref();
let status = unsafe {
abi::xqd_req_header_values_set(
self.handle(),
name.as_ptr(),
name.len(),
buf.as_ptr(),
buf.len(),
)
};
if status.is_err() {
Err(Error::msg("xqd_req_header_values_set failed"))
} else {
Ok(())
}
}
pub fn insert_header(&mut self, name: &HeaderName, value: &HeaderValue) -> Result<(), Error> {
let name_bytes: &[u8] = name.as_ref();
let value_bytes: &[u8] = value.as_ref();
let status = unsafe {
abi::xqd_req_header_insert(
self.handle(),
name_bytes.as_ptr(),
name_bytes.len(),
value_bytes.as_ptr(),
value_bytes.len(),
)
};
if status.is_err() {
Err(Error::msg("xqd_req_header_insert returned error"))
} else {
Ok(())
}
}
pub fn append_header(&mut self, name: &HeaderName, value: &HeaderValue) -> Result<(), Error> {
let name_bytes: &[u8] = name.as_ref();
let value_bytes: &[u8] = value.as_ref();
let status = unsafe {
abi::xqd_req_header_append(
self.handle(),
name_bytes.as_ptr(),
name_bytes.len(),
value_bytes.as_ptr(),
value_bytes.len(),
)
};
if status.is_err() {
Err(Error::msg("xqd_req_header_append returned error"))
} else {
Ok(())
}
}
pub fn get_version(&self) -> Result<Version, Error> {
let mut version = 0;
let status = unsafe { abi::xqd_req_version_get(self.handle(), &mut version) };
if status.is_err() {
Err(Error::msg("xqd_req_version_get failed"))
} else {
abi::HttpVersion::try_from(version)
.map(Into::into)
.map_err(Error::msg)
}
}
pub fn set_version(&mut self, v: Version) -> Result<(), Error> {
let status =
unsafe { abi::xqd_req_version_set(self.handle(), abi::HttpVersion::from(v) as u32) };
if status.is_err() {
Err(Error::msg("xqd_req_version_get failed"))
} else {
Ok(())
}
}
pub fn get_method(&self, max_length: usize) -> Result<Method, Error> {
let mut method_bytes = Vec::with_capacity(max_length);
let mut nwritten = 0;
let status = unsafe {
abi::xqd_req_method_get(
self.handle(),
method_bytes.as_mut_ptr(),
method_bytes.capacity(),
&mut nwritten,
)
};
if status.is_err() {
return Err(Error::msg("xqd_req_method_get failed"));
}
assert!(
nwritten <= method_bytes.capacity(),
"xqd_req_method_get wrote too many bytes"
);
unsafe {
method_bytes.set_len(nwritten);
}
Method::from_bytes(&method_bytes)
.map_err(|_| anyhow!("invalid method: {}", String::from_utf8_lossy(&method_bytes)))
}
pub fn set_method(&self, method: &Method) -> Result<(), Error> {
let method_bytes = method.as_str().as_bytes();
let status = unsafe {
abi::xqd_req_method_set(self.handle(), method_bytes.as_ptr(), method_bytes.len())
};
if status.is_err() {
Err(Error::msg("xqd_req_method_set failed"))
} else {
Ok(())
}
}
pub fn get_uri(&self, max_length: usize) -> Result<Uri, Error> {
let mut uri_bytes = BytesMut::with_capacity(max_length);
let mut nwritten = 0;
let status = unsafe {
abi::xqd_req_uri_get(
self.handle(),
uri_bytes.as_mut_ptr(),
uri_bytes.capacity(),
&mut nwritten,
)
};
if status.is_err() {
return Err(Error::msg("xqd_req_uri_get failed"));
}
assert!(
nwritten <= uri_bytes.capacity(),
"xqd_req_uri_get wrote too many bytes"
);
unsafe {
uri_bytes.set_len(nwritten);
}
Uri::from_maybe_shared(uri_bytes.freeze()).map_err(|e| anyhow!("invalid URI: {}", e))
}
pub fn set_uri(&mut self, uri: &Uri) -> Result<(), Error> {
let uri_bytes = uri.to_string().into_bytes();
let status =
unsafe { abi::xqd_req_uri_set(self.handle(), uri_bytes.as_ptr(), uri_bytes.len()) };
if status.is_err() {
Err(Error::msg("xqd_req_uri_set failed"))
} else {
Ok(())
}
}
pub fn send(
self,
body: BodyHandle,
backend: &str,
) -> Result<(ResponseHandle, BodyHandle), Error> {
let mut resp_handle = ResponseHandle::INVALID;
let mut resp_body_handle = BodyHandle::INVALID;
let status = unsafe {
abi::xqd_req_send(
self.handle(),
body.handle(),
backend.as_ptr(),
backend.len(),
&mut resp_handle,
&mut resp_body_handle,
)
};
if status.is_err() || resp_handle.is_invalid() || resp_body_handle.is_invalid() {
Err(Error::msg("xqd_req_send failed"))
} else {
Ok((resp_handle, resp_body_handle))
}
}
pub fn send_async(
self,
body: BodyHandle,
backend: &str,
) -> Result<PendingRequestHandle, Error> {
let mut pending_req_handle = PendingRequestHandle::INVALID;
let status = unsafe {
abi::xqd_req_send_async(
self.handle(),
body.handle(),
backend.as_ptr(),
backend.len(),
&mut pending_req_handle,
)
};
if status.is_err() || pending_req_handle.is_invalid() {
Err(Error::msg("xqd_req_send_async failed"))
} else {
Ok(pending_req_handle)
}
}
pub fn set_cache_override(&mut self, cache_override: CacheOverride) {
let (tag, ttl, swr) = cache_override.to_abi();
let status = unsafe { abi::xqd_req_cache_override_set(self.handle(), tag, ttl, swr) };
if status.is_err() {
panic!("xqd_req_cache_override_set failed");
}
}
}
pub fn downstream_request_and_body_handles() -> Result<(RequestHandle, BodyHandle), Error> {
let mut got_req = crate::request::GOT_DOWNSTREAM.lock().unwrap();
let mut got_body = crate::body::GOT_DOWNSTREAM.lock().unwrap();
if *got_req || *got_body {
return Err(Error::msg(
"cannot get more than one handle to the downstream request per execution",
));
}
let mut req_handle = RequestHandle::INVALID;
let mut body_handle = BodyHandle::INVALID;
let status = unsafe { abi::xqd_req_body_downstream_get(&mut req_handle, &mut body_handle) };
if status.is_err() || req_handle.is_invalid() || body_handle.is_invalid() {
Err(Error::msg("xqd_req_body_downstream_get failed"))
} else {
*got_req = true;
*got_body = true;
Ok((req_handle, body_handle))
}
}
pub fn downstream_request() -> Result<Request<Body>, Error> {
let (req_handle, body_handle) = downstream_request_and_body_handles()?;
let mut req = Request::builder()
.version(req_handle.get_version()?)
.method(req_handle.get_method(crate::METHOD_MAX_LEN)?)
.uri(req_handle.get_uri(crate::URI_MAX_LEN)?);
for name in req_handle.get_header_names(crate::HEADER_NAME_MAX_LEN) {
let name = name?;
for value in req_handle.get_header_values(&name, crate::HEADER_VALUE_MAX_LEN) {
req = req.header(&name, value?);
}
}
Ok(req.body(body_handle.into())?)
}
pub fn downstream_client_ip_addr() -> Option<IpAddr> {
let mut octets = [0; 16];
let mut nwritten = 0;
let status =
unsafe { abi::xqd_req_downstream_client_ip_addr(octets.as_mut_ptr(), &mut nwritten) };
if status.is_err() {
panic!("downstream_client_ip_addr failed");
}
match nwritten {
0 => None,
4 => {
let octets: [u8; 4] = octets[0..4]
.try_into()
.expect("octets is at least 4 bytes long");
let addr: Ipv4Addr = octets.into();
Some(addr.into())
}
16 => {
let addr: Ipv6Addr = octets.into();
Some(addr.into())
}
_ => panic!("downstream_client_ip_addr wrote an unexpected number of bytes"),
}
}
pub fn downstream_tls_client_hello() -> Result<Vec<u8>, Error> {
let mut ch_size = 0;
let status =
unsafe { abi::xqd_req_downstream_tls_client_hello(std::ptr::null_mut(), 0, &mut ch_size) };
if status.is_err() && ch_size == 0 {
panic!("couldn't get the downstream TLS client hello");
}
let mut buf = vec![0; ch_size];
let status = unsafe {
abi::xqd_req_downstream_tls_client_hello(buf.as_mut_ptr(), buf.len(), &mut ch_size)
};
if status.is_err() {
panic!("couldn't get the downstream TLS cipher protocol");
}
Ok(buf)
}
pub fn downstream_tls_cipher_openssl_name() -> &'static str {
lazy_static! {
static ref OPENSSL_NAME: String = {
let mut buf = vec![0; 128];
let mut nwritten = 0;
let status = unsafe {
abi::xqd_req_downstream_tls_cipher_openssl_name(
buf.as_mut_ptr(),
buf.len(),
&mut nwritten,
)
};
if status.is_err() {
panic!("couldn't get the downstream TLS cipher's OpenSSL name");
}
buf.truncate(nwritten);
String::from_utf8(buf).expect("TLS cipher OpenSSL name must be valid UTF-8")
};
}
OPENSSL_NAME.as_str()
}
pub fn downstream_tls_protocol() -> &'static str {
lazy_static! {
static ref PROTOCOL: String = {
let mut buf = vec![0; 32];
let mut nwritten = 0;
let status = unsafe {
abi::xqd_req_downstream_tls_protocol(buf.as_mut_ptr(), buf.len(), &mut nwritten)
};
if status.is_err() {
panic!("couldn't get the downstream TLS cipher protocol");
}
buf.truncate(nwritten);
String::from_utf8(buf).expect("TLS protocol must be valid UTF-8")
};
}
PROTOCOL.as_str()
}
pub fn downstream_original_header_names_with_len(
buf_size: usize,
) -> impl Iterator<Item = Result<String, Error>> {
abi::MultiValueHostcall::new(
b'\0',
buf_size,
move |buf, buf_size, cursor, ending_cursor, nwritten| unsafe {
abi::xqd_req_original_header_names_get(buf, buf_size, cursor, ending_cursor, nwritten)
},
)
.map(|res| {
res.and_then(|name_bytes| {
String::from_utf8(name_bytes.to_vec())
.map_err(|e| anyhow!("invalid header name: {}", e))
})
})
}
pub fn downstream_original_header_names() -> impl Iterator<Item = Result<String, Error>> {
downstream_original_header_names_with_len(crate::HEADER_NAME_MAX_LEN)
}
pub trait RequestExt: Sized {
fn send(self, backend: &str) -> Result<Response<Body>, Error> {
self.inner_to_body()?.send(backend)
}
fn send_async(self, backend: &str) -> Result<PendingRequest, Error> {
self.inner_to_body()?.send_async(backend)
}
fn inner_to_body(self) -> Result<Request<Body>, Error>;
fn inner_to_bytes(self) -> Result<Request<Vec<u8>>, Error>;
fn fastly_metadata(&self) -> &FastlyRequestMetadata;
fn fastly_metadata_mut(&mut self) -> &mut FastlyRequestMetadata;
fn set_cache_override(&mut self, cache_override: CacheOverride) {
self.fastly_metadata_mut().cache_override = cache_override;
}
fn get_cache_override(&self) -> CacheOverride {
self.fastly_metadata().cache_override
}
fn set_pass(&mut self) {
self.fastly_metadata_mut().cache_override.set_pass();
}
fn set_ttl(&mut self, ttl: u32) {
self.fastly_metadata_mut().cache_override.set_ttl(ttl);
}
fn set_stale_while_revalidate(&mut self, swr: u32) {
self.fastly_metadata_mut()
.cache_override
.set_stale_while_revalidate(swr);
}
}
fn request_to_handles(req: Request<Body>) -> Result<(RequestHandle, BodyHandle), Error> {
let mut req_handle = RequestHandle::new()?;
req_handle.set_version(req.version())?;
req_handle.set_method(req.method())?;
req_handle.set_uri(req.uri())?;
for name in req.headers().keys() {
req_handle.set_header_values(name, req.headers().get_all(name))?;
}
let md = req.fastly_metadata();
req_handle.set_cache_override(md.cache_override);
Ok((req_handle, req.into_body().into_handle()?))
}
fn validate_request(req: &Request<Body>, backend: &str) -> Result<(), Error> {
validate_backend(backend)?;
ensure!(
req.uri().scheme().is_some() && req.uri().authority().is_some(),
"request URIs must have a scheme (http/https) and an authority (host)"
);
Ok(())
}
fn get_or_default_metadata(exts: &Extensions) -> &FastlyRequestMetadata {
const DEFAULT: FastlyRequestMetadata = FastlyRequestMetadata::default();
if let Some(md) = exts.get::<FastlyRequestMetadata>() {
md
} else {
&DEFAULT
}
}
fn get_or_insert_metadata(exts: &mut Extensions) -> &mut FastlyRequestMetadata {
if exts.get::<FastlyRequestMetadata>().is_none() {
exts.insert(FastlyRequestMetadata::default());
}
exts.get_mut::<FastlyRequestMetadata>().unwrap()
}
impl RequestExt for Request<Body> {
fn send(self, backend: &str) -> Result<Response<Body>, Error> {
validate_request(&self, backend)?;
let (req_handle, req_body_handle) = request_to_handles(self)?;
let (resp_handle, resp_body_handle) = req_handle.send(req_body_handle, backend)?;
handles_to_response(resp_handle, resp_body_handle)
}
fn send_async(self, backend: &str) -> Result<PendingRequest, Error> {
validate_request(&self, backend)?;
let (req_handle, req_body_handle) = request_to_handles(self)?;
let pending_req_handle = req_handle.send_async(req_body_handle, backend)?;
Ok(pending_req_handle.into())
}
fn inner_to_body(self) -> Result<Request<Body>, Error> {
Ok(self)
}
fn inner_to_bytes(self) -> Result<Request<Vec<u8>>, Error> {
let (parts, body) = self.into_parts();
Ok(Request::from_parts(parts, body.into_bytes()?))
}
fn fastly_metadata(&self) -> &FastlyRequestMetadata {
get_or_default_metadata(self.extensions())
}
fn fastly_metadata_mut(&mut self) -> &mut FastlyRequestMetadata {
get_or_insert_metadata(self.extensions_mut())
}
}
impl RequestExt for Request<&[u8]> {
fn inner_to_body(self) -> Result<Request<Body>, Error> {
let mut body = Body::new()?;
body.write_all(self.body())?;
Ok(self.map(|_| body))
}
fn inner_to_bytes(self) -> Result<Request<Vec<u8>>, Error> {
Ok(self.map(|b| b.to_vec()))
}
fn fastly_metadata(&self) -> &FastlyRequestMetadata {
get_or_default_metadata(self.extensions())
}
fn fastly_metadata_mut(&mut self) -> &mut FastlyRequestMetadata {
get_or_insert_metadata(self.extensions_mut())
}
}
impl RequestExt for Request<Vec<u8>> {
fn inner_to_body(self) -> Result<Request<Body>, Error> {
let mut body = Body::new()?;
body.write_all(self.body())?;
Ok(self.map(|_| body))
}
fn inner_to_bytes(self) -> Result<Request<Vec<u8>>, Error> {
Ok(self)
}
fn fastly_metadata(&self) -> &FastlyRequestMetadata {
get_or_default_metadata(self.extensions())
}
fn fastly_metadata_mut(&mut self) -> &mut FastlyRequestMetadata {
get_or_insert_metadata(self.extensions_mut())
}
}
impl RequestExt for Request<&str> {
fn inner_to_body(self) -> Result<Request<Body>, Error> {
let mut body = Body::new()?;
body.write_all(self.body().as_bytes())?;
Ok(self.map(|_| body))
}
fn inner_to_bytes(self) -> Result<Request<Vec<u8>>, Error> {
Ok(self.map(|b| b.as_bytes().to_vec()))
}
fn fastly_metadata(&self) -> &FastlyRequestMetadata {
get_or_default_metadata(self.extensions())
}
fn fastly_metadata_mut(&mut self) -> &mut FastlyRequestMetadata {
get_or_insert_metadata(self.extensions_mut())
}
}
impl RequestExt for Request<String> {
fn inner_to_body(self) -> Result<Request<Body>, Error> {
let mut body = Body::new()?;
body.write_all(self.body().as_bytes())?;
Ok(self.map(|_| body))
}
fn inner_to_bytes(self) -> Result<Request<Vec<u8>>, Error> {
Ok(self.map(|b| b.into_bytes()))
}
fn fastly_metadata(&self) -> &FastlyRequestMetadata {
get_or_default_metadata(self.extensions())
}
fn fastly_metadata_mut(&mut self) -> &mut FastlyRequestMetadata {
get_or_insert_metadata(self.extensions_mut())
}
}
impl RequestExt for Request<()> {
fn inner_to_body(self) -> Result<Request<Body>, Error> {
let body = Body::new()?;
Ok(self.map(|_| body))
}
fn inner_to_bytes(self) -> Result<Request<Vec<u8>>, Error> {
Ok(self.map(|_| vec![]))
}
fn fastly_metadata(&self) -> &FastlyRequestMetadata {
get_or_default_metadata(self.extensions())
}
fn fastly_metadata_mut(&mut self) -> &mut FastlyRequestMetadata {
get_or_insert_metadata(self.extensions_mut())
}
}
pub trait RequestBuilderExt: Sized {
fn fastly_metadata_ref(&self) -> Option<&FastlyRequestMetadata>;
fn fastly_metadata_mut(&mut self) -> Option<&mut FastlyRequestMetadata>;
fn cache_override(mut self, cache_override: CacheOverride) -> Self {
self.fastly_metadata_mut()
.map(|md| md.cache_override = cache_override);
self
}
fn cache_override_ref(&self) -> Option<&CacheOverride> {
self.fastly_metadata_ref().map(|md| &md.cache_override)
}
fn cache_override_mut(&mut self) -> Option<&mut CacheOverride> {
self.fastly_metadata_mut().map(|md| &mut md.cache_override)
}
fn pass(mut self) -> Self {
self.cache_override_mut().map(|co| co.set_pass());
self
}
fn ttl(mut self, ttl: u32) -> Self {
self.cache_override_mut().map(|co| co.set_ttl(ttl));
self
}
fn stale_while_revalidate(mut self, swr: u32) -> Self {
self.cache_override_mut()
.map(|co| co.set_stale_while_revalidate(swr));
self
}
}
impl RequestBuilderExt for http::request::Builder {
fn fastly_metadata_ref(&self) -> Option<&FastlyRequestMetadata> {
self.extensions_ref().map(get_or_default_metadata)
}
fn fastly_metadata_mut(&mut self) -> Option<&mut FastlyRequestMetadata> {
self.extensions_mut().map(get_or_insert_metadata)
}
}
#[derive(Debug, Eq, Hash, PartialEq)]
#[repr(transparent)]
pub struct PendingRequestHandle {
handle: u32,
}
impl From<PendingRequest> for PendingRequestHandle {
fn from(pr: PendingRequest) -> Self {
pr.handle
}
}
impl PendingRequestHandle {
pub(crate) const INVALID: Self = Self {
handle: fastly_shared::INVALID_PENDING_REQUEST_HANDLE,
};
pub(crate) fn is_invalid(&self) -> bool {
self == &Self::INVALID
}
pub(crate) unsafe fn handle(&self) -> Self {
Self {
handle: self.handle,
}
}
pub fn poll(self) -> PollHandleResult {
let mut is_done = -1;
let mut resp_handle = ResponseHandle::INVALID;
let mut body_handle = BodyHandle::INVALID;
let status = unsafe {
abi::xqd_pending_req_poll(
self.handle(),
&mut is_done,
&mut resp_handle,
&mut body_handle,
)
};
if status.is_err() || is_done < 0 || is_done > 1 {
panic!("xqd_pending_req_poll internal error");
}
let is_done = if is_done == 0 { false } else { true };
if !is_done {
return PollHandleResult::Pending(self);
}
if is_done && (resp_handle.is_invalid() || body_handle.is_invalid()) {
PollHandleResult::Done(Err(Error::msg("asynchronous request failed")))
} else {
PollHandleResult::Done(Ok((resp_handle, body_handle)))
}
}
pub fn wait(self) -> Result<(ResponseHandle, BodyHandle), Error> {
let mut resp_handle = ResponseHandle::INVALID;
let mut body_handle = BodyHandle::INVALID;
let status =
unsafe { abi::xqd_pending_req_wait(self.handle(), &mut resp_handle, &mut body_handle) };
if status.is_err() || resp_handle.is_invalid() || body_handle.is_invalid() {
return Err(Error::msg("xqd_pending_req_poll failed"));
}
Ok((resp_handle, body_handle))
}
}
pub enum PollHandleResult {
Pending(PendingRequestHandle),
Done(Result<(ResponseHandle, BodyHandle), Error>),
}
pub fn select_handles<I>(
pending_reqs: I,
) -> (
Result<(ResponseHandle, BodyHandle), Error>,
usize,
Vec<PendingRequestHandle>,
)
where
I: IntoIterator<Item = PendingRequestHandle>,
{
let mut prs = pending_reqs
.into_iter()
.collect::<Vec<PendingRequestHandle>>();
if prs.is_empty() || prs.len() > fastly_shared::MAX_PENDING_REQS as usize {
panic!(
"the number of selected handles must be at least 1, and less than {}",
fastly_shared::MAX_PENDING_REQS
);
}
let mut done_index = -1;
let mut resp_handle = ResponseHandle::INVALID;
let mut body_handle = BodyHandle::INVALID;
let status = unsafe {
abi::xqd_pending_req_select(
prs.as_ptr(),
prs.len(),
&mut done_index,
&mut resp_handle,
&mut body_handle,
)
};
if status.is_err() || done_index < 0 {
panic!("xqd_pending_req_select internal error");
}
let done_index = done_index
.try_into()
.expect("xqd_pending_req_select returned an invalid index");
prs.swap_remove(done_index);
let res = if resp_handle.is_invalid() || body_handle.is_invalid() {
Err(Error::msg("selected request failed"))
} else {
Ok((resp_handle, body_handle))
};
(res, done_index, prs)
}
pub struct PendingRequest {
handle: PendingRequestHandle,
}
impl From<PendingRequestHandle> for PendingRequest {
fn from(handle: PendingRequestHandle) -> Self {
Self { handle }
}
}
impl PendingRequest {
pub fn poll(self) -> PollResult {
match self.handle.poll() {
PollHandleResult::Pending(prh) => PollResult::Pending(prh.into()),
PollHandleResult::Done(Ok((resp_handle, resp_body_handle))) => {
PollResult::Done(handles_to_response(resp_handle, resp_body_handle))
}
PollHandleResult::Done(Err(e)) => PollResult::Done(Err(e)),
}
}
pub fn wait(self) -> Result<Response<Body>, Error> {
let (resp_handle, resp_body_handle) = self.handle.wait()?;
handles_to_response(resp_handle, resp_body_handle)
}
}
pub enum PollResult {
Pending(PendingRequest),
Done(Result<Response<Body>, Error>),
}
pub fn select<I>(pending_reqs: I) -> (Result<Response<Body>, Error>, usize, Vec<PendingRequest>)
where
I: IntoIterator<Item = PendingRequest>,
{
let (res, done_index, rest) = select_handles(pending_reqs.into_iter().map(Into::into));
let res = match res {
Ok((resp_handle, resp_body_handle)) => handles_to_response(resp_handle, resp_body_handle),
Err(e) => Err(e),
};
let rest = rest.into_iter().map(Into::into).collect();
(res, done_index, rest)
}