extern crate gluon;
extern crate gluon_base as base;
#[macro_use]
extern crate gluon_vm as vm;
#[macro_use]
extern crate collect_mac;
extern crate env_logger;
extern crate futures;
extern crate hyper;
#[macro_use]
extern crate log;
use std::env;
use std::fmt;
use std::error::Error as StdError;
use std::fs::File;
use std::io::{stderr, Read, Write};
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
use hyper::{Chunk, Method, StatusCode};
use hyper::server::Service;
use futures::Async;
use futures::future::Future;
use futures::sink::Sink;
use futures::stream::Stream;
use futures::sync::mpsc::Sender;
use base::types::{ArcType, Type};
use vm::{Error as VmError, Result as VmResult};
use vm::thread::ThreadInternal;
use vm::thread::{Context, RootedThread, Thread};
use vm::Variants;
use vm::api::{Function, FunctionRef, FutureResult, Getable, OpaqueValue, PushAsRef, Pushable,
Userdata, ValueRef, VmType, WithVM, IO};
use vm::gc::{Gc, Traverseable};
use vm::internal::Value;
use gluon::{new_vm, Compiler};
struct Handler<T>(PhantomData<T>);
impl<T: VmType + 'static> VmType for Handler<T> {
type Type = Self;
fn make_type(vm: &Thread) -> ArcType {
let typ = (*vm.global_env()
.get_env()
.find_type_info("examples.http_types.Handler")
.unwrap())
.clone()
.into_type();
Type::app(typ, collect![T::make_type(vm)])
}
}
struct Wrap<T>(T);
macro_rules! define_vmtype {
($name: ident) => {
impl VmType for Wrap<$name> {
type Type = $name;
fn make_type(vm: &Thread) -> ArcType {
let typ = concat!("examples.http_types.", stringify!($name));
(*vm.global_env().get_env().find_type_info(typ).unwrap())
.clone()
.into_type()
}
}
}
}
define_vmtype! { Method }
impl<'vm> Pushable<'vm> for Wrap<Method> {
fn push(self, _: &'vm Thread, context: &mut Context) -> VmResult<()> {
use hyper::Method::*;
context.stack.push(Value::Tag(match self.0 {
Get => 0,
Post => 1,
Delete => 2,
_ => {
return Err(
VmError::Message(format!("Method `{:?}` does not exist in gluon", self.0))
.into(),
)
}
}));
Ok(())
}
}
define_vmtype! { StatusCode }
impl<'vm> Getable<'vm> for Wrap<StatusCode> {
fn from_value(_: &'vm Thread, value: Variants) -> Option<Self> {
use hyper::StatusCode::*;
match value.as_ref() {
ValueRef::Data(data) => Some(Wrap(match data.tag() {
0 => Ok,
1 => NotFound,
2 => InternalServerError,
_ => panic!("Unexpected tag"),
})),
_ => panic!(),
}
}
}
pub struct Body(
Arc<Mutex<Box<Stream<Item = PushAsRef<Chunk, [u8]>, Error = VmError> + Send + 'static>>>,
);
impl Userdata for Body {}
impl fmt::Debug for Body {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "hyper::Body")
}
}
impl Traverseable for Body {
fn traverse(&self, _: &mut Gc) {}
}
impl VmType for Body {
type Type = Self;
}
fn read_chunk(
body: &Body,
) -> FutureResult<
Box<Future<Item = IO<Option<PushAsRef<Chunk, [u8]>>>, Error = VmError> + Send + 'static>,
> {
use futures::future::poll_fn;
let body = body.0.clone();
FutureResult(Box::new(poll_fn(move || {
let mut stream = body.lock().unwrap();
stream.poll().map(|async| async.map(IO::Value))
})))
}
pub struct ResponseBody(Arc<Mutex<Option<Sender<Result<Chunk, hyper::Error>>>>>);
impl fmt::Debug for ResponseBody {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "hyper::Response")
}
}
impl Userdata for ResponseBody {}
impl Traverseable for ResponseBody {
fn traverse(&self, _: &mut Gc) {}
}
impl VmType for ResponseBody {
type Type = Self;
}
fn write_response(
response: &ResponseBody,
bytes: &[u8],
) -> FutureResult<Box<Future<Item = IO<()>, Error = VmError> + Send + 'static>> {
use futures::future::poll_fn;
use futures::AsyncSink;
let mut unsent_chunk = Some(Ok(bytes.to_owned().into()));
let response = response.0.clone();
FutureResult(Box::new(poll_fn(move || {
info!("Starting response send");
let mut sender = response.lock().unwrap();
let sender = sender
.as_mut()
.expect("Sender has been dropped while still in use");
if let Some(chunk) = unsent_chunk.take() {
match sender.start_send(chunk) {
Ok(AsyncSink::NotReady(chunk)) => {
unsent_chunk = Some(chunk);
return Ok(Async::NotReady);
}
Ok(AsyncSink::Ready) => (),
Err(_) => {
info!("Could not send http response");
return Ok(Async::Ready(IO::Value(())));
}
}
}
match sender.poll_complete() {
Ok(async) => Ok(async.map(IO::Value)),
Err(_) => {
info!("Could not send http response");
Ok(Async::Ready(IO::Value(())))
}
}
})))
}
field_decl! { method, uri, status, body, request, response }
type Request = record_type!{
method => Wrap<Method>,
uri => String,
body => Body
};
type Response = record_type!{
status => Wrap<StatusCode>
};
type HttpState = record_type!{
request => Request,
response => ResponseBody
};
fn listen(port: i32, value: WithVM<OpaqueValue<RootedThread, Handler<Response>>>) -> IO<()> {
let WithVM {
value: handler,
vm: thread,
} = value;
use hyper::server::{Http, Request as HyperRequest, Response as HyperResponse};
type ListenFn = fn(OpaqueValue<RootedThread, Handler<Response>>, HttpState)
-> IO<Response>;
let handle: Function<RootedThread, ListenFn> = thread
.get_global("examples.http.handle")
.unwrap_or_else(|err| panic!("{}", err));
struct Listen {
handle: Function<RootedThread, ListenFn>,
handler: OpaqueValue<RootedThread, Handler<Response>>,
}
impl Service for Listen {
type Request = HyperRequest;
type Response = HyperResponse;
type Error = hyper::Error;
type Future = Box<Future<Item = HyperResponse, Error = hyper::Error> + Send + 'static>;
fn call(&self, request: HyperRequest) -> Self::Future {
let gluon_request = record_no_decl! {
method => Wrap(request.method().clone()),
uri => request.uri().to_string(),
body => Body(Arc::new(Mutex::new(Box::new(request.body()
.map_err(|err| VmError::Message(format!("{}", err)))
.map(PushAsRef::<_, [u8]>::new)))))
};
let (response_sender, response_body) = hyper::Body::pair();
let response_sender = Arc::new(Mutex::new(Some(response_sender)));
let http_state = record_no_decl!{
request => gluon_request,
response => ResponseBody(response_sender.clone())
};
Box::new(
self.handle
.clone()
.call_async(self.handler.clone(), http_state)
.then(move |result| match result {
Ok(value) => {
match value {
IO::Value(record_p!{ status }) => {
*response_sender.lock().unwrap() = None;
Ok(
HyperResponse::new()
.with_status(status.0)
.with_body(response_body),
)
}
IO::Exception(err) => {
let _ = stderr().write(err.as_bytes());
Ok(
HyperResponse::new()
.with_status(StatusCode::InternalServerError),
)
}
}
}
Err(err) => {
let _ = stderr().write(format!("{}", err).as_bytes());
Ok(HyperResponse::new().with_status(StatusCode::InternalServerError))
}
}),
)
}
}
let addr = format!("127.0.0.1:{}", port).parse().unwrap();
let result = Http::new()
.bind(&addr, move || {
Ok(Listen {
handle: handle.clone(),
handler: handler.clone(),
})
})
.and_then(|server| server.run());
match result {
Ok(()) => IO::Value(()),
Err(err) => IO::Exception(format!("{}", err)),
}
}
pub fn load_types(vm: &Thread) -> VmResult<()> {
vm.register_type::<Body>("Body", &[])?;
vm.register_type::<ResponseBody>("ResponseBody", &[])?;
Ok(())
}
pub fn load(vm: &Thread) -> VmResult<()> {
vm.define_global(
"http_prim",
record! {
listen => primitive!(2 listen),
read_chunk => primitive!(1 read_chunk),
write_response => primitive!(2 write_response)
},
)?;
Ok(())
}
fn main() {
if let Err(err) = main_() {
panic!("{}", err)
}
}
fn main_() -> Result<(), Box<StdError>> {
let _ = env_logger::init();
let port = env::args()
.nth(1)
.map(|port| port.parse::<i32>().expect("port"))
.unwrap_or(80);
let thread = new_vm();
load_types(&thread)?;
Compiler::new().run_expr::<()>(
&thread,
"",
r#"let _ = import! "examples/http_types.glu" in () "#,
)?;
load(&thread)?;
let mut expr = String::new();
{
let mut file = File::open("examples/http_server.glu")?;
file.read_to_string(&mut expr)?;
}
let (mut listen, _) =
Compiler::new().run_expr::<FunctionRef<fn(i32) -> IO<()>>>(&thread, "http_test", &expr)?;
listen.call(port)?;
Ok(())
}