fibers/lib.rs
1// Copyright (c) 2016 DWANGO Co., Ltd. All Rights Reserved.
2// See the LICENSE file at the top-level directory of this distribution.
3
4//! This is a library to execute a number of lightweight asynchronous tasks (a.k.a, fibers).
5//!
6//! Note that `fibers` heavily uses [futures][futures] to
7//! represent asynchronous task. If you are not familiar with it,
8//! we recommend that you refer the `README.md` and `TUTORIAL.md` of [futures][futures]
9//! before reading the following.
10//!
11//! This library also uses [mio](https://github.com/carllerche/mio) to achieve
12//! efficient asynchronous I/O handling (mainly for networking primitives).
13//! However, its existence is hidden from the user, so you do not usually have to worry about it.
14//!
15//! [futures]: https://github.com/alexcrichton/futures-rs
16//!
17//! ---
18//!
19//! `Future` is an excellent way to represent asynchronous task.
20//! It is intuitive, easily composed with other futures to represent a complicated task,
21//! without runtime overhead.
22//! But, there is a remaining problem that
23//! "How to efficiently execute (possibility a very large amount of) concurrent tasks?".
24//! `fibers` is an answer to the problem.
25//!
26//! Conceptually, the responsibility of `fibers` is very simple.
27//! It represents an asynchronous task (a.k.a., fiber) as a future instance.
28//! And there is an executor that takes futures and executes them like following.
29//!
30//! ```ignore
31//! // Creates an executor.
32//! let mut executor = ThreadPoolExecutor::new().unwrap();
33//!
34//! // Spawns fibers (i.e., passes futures to the executor).
35//! executor.spawn(futures::lazy(|| { println!("Hello"); Ok(())} ));
36//! executor.spawn(futures::lazy(|| { println!("World!"); Ok(())} ));
37//!
38//! // Executes them.
39//! executor.run().unwrap();
40//! ```
41//!
42//! Fibers may be run on different background threads, but the user does not need to notice it.
43//! If it runs on machines with a large number of processors, performance will improve naturally.
44//!
45//! Roughly speaking, if a future returns `Async::NotReady` response to
46//! a call of `Future::poll` method,
47//! the fiber associated with the future will move into the "waiting" state.
48//! Then, it is suspended (descheduled) until any event in which the future is interested happens
49//! (e.g., waits until data is arrived on a target TCP socket).
50//! Finally, if a future returns `Async::Ready` response,
51//! the fiber will be regarded as completed and the executor will drop the fiber.
52//!
53//! This library provides primitives for writing programs in an efficient
54//! asynchronous fashion (See documentations of [net](net/index.html),
55//! [sync](sync/index.html), [io](io/index.html), [time](time/index.html) modules for more details).
56//!
57//! The main concern of this library is "how to execute fibers".
58//! So it is preferred to use external crates (e.g., [`handy_async`][handy_async])
59//! to describe "how to represent asynchronous tasks".
60//!
61//! [handy_async]: https://github.com/sile/handy_async
62//!
63//! Examples
64//! ========
65//!
66//! The following are examples of writing code to perform asynchronous tasks.
67//!
68//! Other examples are found in "fibers/examples" directory.
69//! And you can run an example by executing the following command.
70//!
71//! ```bash
72//! $ cargo run --example ${EXAMPLE_NAME}
73//! ```
74//!
75//! ### Calculation of fibonacci numbers
76//!
77//! ```
78//! # extern crate fibers;
79//! # extern crate futures;
80//! use fibers::{Spawn, Executor, ThreadPoolExecutor};
81//! use futures::Future;
82//!
83//! fn fibonacci<H: Spawn + Clone>(n: usize, handle: H) -> Box<dyn Future<Item=usize, Error=()> + Send> {
84//! if n < 2 {
85//! Box::new(futures::finished(n))
86//! } else {
87//! // Spawns a new fiber per recursive call.
88//! let f0 = handle.spawn_monitor(fibonacci(n - 1, handle.clone()));
89//! let f1 = handle.spawn_monitor(fibonacci(n - 2, handle.clone()));
90//! Box::new(f0.join(f1).map(|(a0, a1)| a0 + a1).map_err(|_| ()))
91//! }
92//! }
93//!
94//! // Creates an executor instance.
95//! let mut executor = ThreadPoolExecutor::new().unwrap();
96//!
97//! // Creates a future which will calculate the fibonacchi number of `10`.
98//! let input_number = 10;
99//! let future = fibonacci(input_number, executor.handle());
100//!
101//! // Spawns and executes the future (fiber).
102//! let monitor = executor.spawn_monitor(future);
103//! let answer = executor.run_fiber(monitor).unwrap();
104//!
105//! // Checkes the answer.
106//! assert_eq!(answer, Ok(55));
107//! ```
108//!
109//! ### TCP Echo Server
110//!
111//! An example of TCP echo server listening at the address "127.0.0.1:3000":
112//!
113//! ```no_run
114//! # extern crate fibers;
115//! # extern crate futures;
116//! # extern crate handy_async;
117//! use std::io;
118//! use fibers::{Spawn, Executor, ThreadPoolExecutor};
119//! use fibers::net::TcpListener;
120//! use futures::{Future, Stream};
121//! use handy_async::io::{AsyncWrite, ReadFrom};
122//! use handy_async::pattern::AllowPartial;
123//!
124//! let server_addr = "127.0.0.1:3000".parse().expect("Invalid TCP bind address");
125//!
126//! let mut executor = ThreadPoolExecutor::new().expect("Cannot create Executor");
127//! let handle0 = executor.handle();
128//! let monitor = executor.spawn_monitor(TcpListener::bind(server_addr)
129//! .and_then(move |listener| {
130//! println!("# Start listening: {}: ", server_addr);
131//!
132//! // Creates a stream of incoming TCP client sockets
133//! listener.incoming().for_each(move |(client, addr)| {
134//! // New client is connected.
135//! println!("# CONNECTED: {}", addr);
136//! let handle1 = handle0.clone();
137//!
138//! // Spawns a fiber to handle the client.
139//! handle0.spawn(client.and_then(move |client| {
140//! // For simplicity, splits reading process and
141//! // writing process into differrent fibers.
142//! let (reader, writer) = (client.clone(), client);
143//! let (tx, rx) = fibers::sync::mpsc::channel();
144//!
145//! // Spawns a fiber for the writer side.
146//! // When a message is arrived in `rx`,
147//! // this fiber sends it back to the client.
148//! handle1.spawn(rx.map_err(|_| -> io::Error { unreachable!() })
149//! .fold(writer, |writer, buf: Vec<u8>| {
150//! println!("# SEND: {} bytes", buf.len());
151//! writer.async_write_all(buf)
152//! .map(|(w, _)| w)
153//! .map_err(|e| e.into_error())
154//! })
155//! .then(|r| {
156//! println!("# Writer finished: {:?}", r);
157//! Ok(())
158//! }));
159//!
160//! // The reader side is executed in the current fiber.
161//! let stream = vec![0;1024].allow_partial().into_stream(reader);
162//! stream.map_err(|e| e.into_error())
163//! .fold(tx, |tx, (mut buf, len)| {
164//! buf.truncate(len);
165//! println!("# RECV: {} bytes", buf.len());
166//!
167//! // Sends received to the writer half.
168//! tx.send(buf).expect("Cannot send");
169//! Ok(tx) as io::Result<_>
170//! })
171//! })
172//! .then(|r| {
173//! println!("# Client finished: {:?}", r);
174//! Ok(())
175//! }));
176//! Ok(())
177//! })
178//! }));
179//! let result = executor.run_fiber(monitor).expect("Execution failed");
180//! println!("# Listener finished: {:?}", result);
181//! ```
182#![warn(missing_docs)]
183
184extern crate futures;
185extern crate mio;
186extern crate nbchan;
187extern crate num_cpus;
188extern crate splay_tree;
189
190macro_rules! assert_some {
191 ($e:expr) => {
192 match $e {
193 Some(value) => value,
194 None => panic!(
195 "[{}:{}] {:?} must be a Some(..)",
196 file!(),
197 line!(),
198 stringify!($e)
199 ),
200 }
201 };
202}
203
204#[doc(inline)]
205pub use self::executor::{Executor, InPlaceExecutor, ThreadPoolExecutor};
206
207#[doc(inline)]
208pub use self::fiber::{BoxSpawn, Spawn};
209
210pub mod executor;
211pub mod fiber;
212pub mod io;
213pub mod net;
214pub mod sync;
215pub mod time;
216
217mod collections;
218mod sync_atomic;