easyfibers
easyfibers is a closure-less couroutine library for executing asynchronous tasks as painlessly as possible. It is a small layer on top of mio and context-rs.
Description
easyfibers allows one to write code as if it used blocking sockets and does not require putting your code in awkward closures. It will seamlessly poll and schedule fibers on read, write and accept function calls.
Vision
Have easyfibers stacks run protocols, have client code on main stack (using Fiber::join_main).
This way:
-
Protocol implementations can be done efficiently and in a straightforward way.
-
Users of protocol clients and servers have no issues with stack size.
-
Users are free to implement their code whichever way they want outside of any framework restrictions; without forcing their code into callbacks or closures.
Warning
Eeach fiber is executed in its own stack. These stacks are much more limited and one must be careful as to not go over limit (as it will kill your app with a SIGBUS).
Is the risk worth it? I think so. Given the ease of use compared to other coroutine/fiber libraries. I disagree with the (ab)use of clousures of other libraries.
Heavy use of closures makes the code ugly, produces awful compile errors and makes it hard to integrate with the rest of your code.
TODO
Example - random http/1.1 proxy
Uses 3 types of fibers:
-
TcpListener that accepts connections.
-
TcpStream server that receives request and spawns a http client fiber.
-
TcpStream client with two roles:
Run the bottom example from one terminal:
cargo test -- --nocapture
extern crate easyfibers;
extern crate rand;
use easyfibers::*;
use mio::net::{TcpStream,TcpListener};
use std::io::{Write,Read};
use std::time::{Duration};
use std::net::{SocketAddr,Ipv4Addr,IpAddr};
use std::str;
use native_tls::{TlsConnector};
use std::io;
#[derive(Clone)]
struct Param {
chosen: Option<String>,
is_https: bool,
proxy_client: bool,
http_hosts: Vec<String>,
https_hosts: Vec<String>,
}
#[derive(PartialEq)]
enum Resp<'a> {
Done,
Bytes(&'a[u8])
}
fn get_http(mut fiber: Fiber<Param,Resp>, p: Param) -> Option<Resp> {
let mut v = [0u8;2000];
let host = p.chosen.unwrap();
if p.is_https {
let connector = TlsConnector::builder().unwrap().build().unwrap();
fiber.tcp_tls_connect(connector, host.as_str()).unwrap();
fiber.socket_timeout(Some(Duration::from_millis(2000)));
} else {
fiber.socket_timeout(Some(Duration::from_millis(1000)));
};
let req = format!("GET / HTTP/1.1\r\nHost: {}\r\nConnection: keep-alive\r\nUser-Agent: test\r\n\r\n",host);
fiber.write(req.as_bytes()).expect("Can not write to socket");
loop {
match fiber.read(&mut v[..]) {
Ok(sz) => {
fiber.resp_chunk(Resp::Bytes(&v[0..sz]));
}
Err(e) => {
assert_eq!(e.kind(), io::ErrorKind::TimedOut);
break;
}
}
}
println!("Client fiber closing {}", p.proxy_client);
Some(Resp::Done)
}
fn rand_http_proxy(mut fiber: Fiber<Param,Resp>, p: Param) -> Option<Resp> {
fiber.socket_timeout(Some(Duration::from_millis(500)));
let chosen = rand::random::<usize>() % p.http_hosts.len();
let port = if rand::random::<u8>() % 2 == 0 { 80 } else { 443 };
let p1 = if port == 443 {
Param {
chosen: Some(p.https_hosts[chosen].clone()),
is_https: port == 443,
http_hosts: Vec::new(),
https_hosts: Vec::new(),
proxy_client: true,
}
} else {
Param {
chosen: Some(p.http_hosts[chosen].clone()),
is_https: port == 443,
http_hosts: Vec::new(),
https_hosts: Vec::new(),
proxy_client: true,
}
};
let addr = if let &Some(ref ch) = &p1.chosen {
ch.clone()
} else {
panic!("")
};
fiber.join_resolve_connect(addr.as_str(), SocketType::Tcp, port, Duration::from_millis(3000), get_http, p1).unwrap();
println!("Returning: {}{}", if port == 443 { "https://" } else { "http://" }, addr);
while let Some(resp) = fiber.get_child() {
if let Resp::Bytes(slice) = resp {
println!("Server got {}", slice.len());
fiber.write(slice);
}
}
println!("Server socket fiber closing");
None
}
fn sock_acceptor(fiber: Fiber<Param,Resp>, p: Param) -> Option<Resp> {
loop {
match fiber.accept_tcp() {
Ok((sock,_)) => {
fiber.new_tcp(sock,rand_http_proxy, p.clone()).unwrap();
}
_ => {
println!("Listen socket error");
break;
}
}
}
None
}
fn main() {
rand::random::<u8>();
let p = Param {
chosen: None,
is_https: false,
proxy_client: false,
http_hosts: vec!["www.liquiddota.com".to_string(),"www.google.com".to_string(),
"www.sqlite.org".to_string(),"edition.cnn.com".to_string()],
https_hosts: vec!["www.reddit.com".to_string(), "www.google.com".to_string(),
"arstechnica.com".to_string(), "news.ycombinator.com".to_string()],
};
let poller:Poller = Poller::new(Some(4096*10)).unwrap();
let runner:Runner<Param,Resp> = Runner::new().unwrap();
let listener = TcpListener::bind(&"127.0.0.1:10000".parse().unwrap()).unwrap();
runner.new_listener(listener, sock_acceptor, p).unwrap();
let mut reqs_remain = 20;
for _ in 0..reqs_remain {
let p = Param {
chosen: Some("127.0.0.1:10000".to_string()),
is_https: false,
proxy_client: false,
http_hosts: Vec::new(),
https_hosts: Vec::new(),
};
let addr = IpAddr::V4(Ipv4Addr::new(127,0,0,1));
let client_sock = TcpStream::connect(&SocketAddr::new(addr, 10000)).unwrap();
runner.new_tcp(client_sock, get_http, p).unwrap();
}
while reqs_remain > 0 {
if !poller.poll(Duration::from_millis(10)) {
continue;
}
if !runner.run() {
continue;
}
while let Some(r) = runner.get_response() {
if Resp::Done == r {
reqs_remain -= 1;
println!("Finished executing, req_remain: {}", reqs_remain);
} else if let Resp::Bytes(slice) = r {
println!("Main stack got {} bytes", slice.len());
}
}
while let Some(_) = runner.get_fiber() {
}
}
println!("poll out");
}