use std::collections::HashMap;
use std::pin::Pin;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
};
use std::task::{Context, Poll};
use futures::Future;
use hyper::body;
use hyper::{Body, Request};
use url::Url;
use lua_actor::actor::Actor;
use lua_actor::message::LuaMessage;
use rlua;
#[inline]
pub fn call_hyper_request(
actor: &mut Actor,
request: &mut Request<Body>,
) -> Result<LuaMessage, rlua::Error> {
actor.call("hyper_request", get_hyper_request_lua_message(request))
}
#[inline]
pub fn call_hyper_request_nowait(
actor: &Actor,
request: &mut Request<Body>,
) {
let _ = actor.call_nowait("hyper_request", get_hyper_request_lua_message(request));
}
#[inline]
pub async fn setup_hyper_get_request_body(
actor: &Actor,
request: &mut Request<Body>,
use_raw_data: bool,
) {
let lua_arc = actor.lua();
let lua = lua_arc.lock().unwrap();
let body_raw = request.body_mut();
let bytes = body::to_bytes(body_raw).await;
let bytes_vec = bytes.ok().unwrap().to_vec();
if use_raw_data {
lua.context(move |lua| {
let _ = actor.def_fn_with_name_sync(
lua,
move |_, _input: rlua::Value| Ok(bytes_vec.clone()),
"hyper_get_request_body",
);
});
} else {
let body_str: String;
body_str = String::from_utf8(bytes_vec).ok().unwrap();
lua.context(move |lua| {
let _ = actor.def_fn_with_name_sync(
lua,
move |_, _input: rlua::Value| Ok(body_str.clone()),
"hyper_get_request_body",
);
});
}
}
#[inline]
pub fn get_hyper_request_lua_message(request: &Request<Body>) -> LuaMessage {
let mut hyper_request = HashMap::<String, LuaMessage>::default();
hyper_request.insert(
"headers".to_string(),
LuaMessage::from(convert_headers(request)),
);
hyper_request.insert(
"url_meta".to_string(),
LuaMessage::from(convert_url(request)),
);
hyper_request.insert(
"method".to_string(),
LuaMessage::from(request.method().to_string()),
);
hyper_request.insert(
"version".to_string(),
LuaMessage::from(format!("{:?}", request.version())),
);
hyper_request.insert(
"uri".to_string(),
LuaMessage::from(request.uri().to_string()),
);
hyper_request.insert(
"extensions".to_string(),
LuaMessage::from(format!("{:?}", request.extensions())),
);
LuaMessage::from(hyper_request)
}
#[inline]
pub fn convert_headers(request: &Request<Body>) -> Vec<LuaMessage> {
let mut data = Vec::<LuaMessage>::default();
for item in request.headers().into_iter() {
let (k, v) = item;
let str_result = v.to_str();
match str_result {
Ok(_str) => {
data.push(LuaMessage::from(vec![
LuaMessage::from(k.as_str()),
LuaMessage::from(_str),
]));
}
Err(_err) => {
let bytes = v
.as_bytes()
.into_iter()
.map(|i| LuaMessage::from(*i))
.collect::<Vec<_>>();
data.push(LuaMessage::from(vec![
LuaMessage::from(k.as_str()),
LuaMessage::from(bytes),
]));
}
}
}
data
}
#[inline]
pub fn convert_url(request: &Request<Body>) -> HashMap<String, LuaMessage> {
let mut data = HashMap::<String, LuaMessage>::default();
let url = request.uri().to_string();
data.insert("url_raw".to_string(), url.clone().into());
match Url::parse(url.as_str()) {
Ok(parsed_url) => {
let query_params: Vec<_> = parsed_url.query_pairs().into_owned().collect();
data.insert(
"query_params".to_string(),
LuaMessage::from(
query_params
.into_iter()
.map(|item| {
LuaMessage::from(vec![
LuaMessage::from(item.0),
LuaMessage::from(item.1),
])
})
.collect::<Vec<_>>(),
),
);
}
Err(_err) => {}
}
data
}
#[derive(Debug, Clone)]
pub struct HyperLatch {
is_alive: Arc<Mutex<AtomicBool>>,
}
impl HyperLatch {
pub fn mark_done(&self) {
self.is_alive.lock().unwrap().store(false, Ordering::SeqCst);
}
}
impl Default for HyperLatch {
fn default() -> Self {
HyperLatch {
is_alive: Arc::new(Mutex::new(AtomicBool::new(true))),
}
}
}
impl Future for HyperLatch {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.is_alive.lock().unwrap().load(Ordering::SeqCst) {
Poll::Pending
} else {
Poll::Ready(())
}
}
}