1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
// Copyright (c) 2016 DWANGO Co., Ltd. All Rights Reserved.
// See the LICENSE file at the top-level directory of this distribution.

//! This is a library to execute a number of lightweight asynchronous tasks (a.k.a, fibers).
//!
//! Note that `fibers` heavily uses [futures][futures] to
//! represent asynchronous task. If you are not familiar with it,
//! we recommend that you refer the `README.md` and `TUTORIAL.md` of [futures][futures]
//! before reading the following.
//!
//! This library also uses [mio](https://github.com/carllerche/mio) to achieve
//! efficient asynchronous I/O handling (mainly for networking primitives).
//! However, its existence is hidden from the user, so you do not usually have to worry about it.
//!
//! [futures]: https://github.com/alexcrichton/futures-rs
//!
//! ---
//!
//! `Future` is an excellent way to represent asynchronous task.
//! It is intuitive, easily composed with other futures to represent a complicated task,
//! without runtime overhead.
//! But, there is a remaining problem that
//! "How to efficiently execute (possibility a very large amount of) concurrent tasks?".
//! `fibers` is an answer to the problem.
//!
//! Conceptually, the responsibility of `fibers` is very simple.
//! It represents an asynchronous task (a.k.a., fiber) as a future instance.
//! And there is an executor that takes futures and executes them like following.
//!
//! ```ignore
//! // Creates an executor.
//! let mut executor = ThreadPoolExecutor::new().unwrap();
//!
//! // Spanws fibers (i.e., passes futures to the executor).
//! executor.spawn(futures::lazy(|| { println!("Hello"); Ok(())} ));
//! executor.spawn(futures::lazy(|| { println!("World!"); Ok(())} ));
//!
//! // Executes them.
//! executor.run().unwrap();
//! ```
//!
//! Fibers may be run on different background threads, but the user does not need to notice it.
//! If it runs on machines with a large number of processors, performance will improve naturally.
//!
//! Roughly speaking, if a future returns `Async::NotReady` response to
//! a call of `Future::poll` method,
//! the fiber associated with the future will move into the "waiting" state.
//! Then, it is suspended (descheduled) until any event in which the future is interested happens
//! (e.g., waits until data is arrived on a target TCP socket).
//! Finally, if a future returns `Async::Ready` response,
//! the fiber will be regarded as completed and the executor will drop the fiber.
//!
//! This library provides primitives for writing programs in an efficient
//! asynchronous fashion (See documentations of [net](net/index.html),
//! [sync](sync/index.html), [io](io/index.html), [time](time/index.html) modules for more details).
//!
//! The main concern of this library is "how to execute fibers".
//! So it is preferred to use external crates (e.g., [handy_async][handy_async])
//! to describe "how to represent asynchronous tasks".
//!
//! [handy_async]: https://github.com/sile/handy_async
//!
//! Examples
//! ========
//!
//! The following are examples of writing code to perform asynchronous tasks.
//!
//! Other examples are found in "fibers/examples" directory.
//! And you can run an example by executing the following command.
//!
//! ```bash
//! $ cargo run --example ${EXAMPLE_NAME}
//! ```
//!
//! ### Calculation of fibonacci numbers
//!
//! ```
//! # extern crate fibers;
//! # extern crate futures;
//! use fibers::{Spawn, Executor, ThreadPoolExecutor};
//! use futures::{Future, BoxFuture};
//!
//! fn fibonacci<H: Spawn + Clone>(n: usize, handle: H) -> BoxFuture<usize, ()> {
//!     if n < 2 {
//!         futures::finished(n).boxed()
//!     } else {
//!         /// Spawns a new fiber per recursive call.
//!         let f0 = handle.spawn_monitor(fibonacci(n - 1, handle.clone()));
//!         let f1 = handle.spawn_monitor(fibonacci(n - 2, handle.clone()));
//!         f0.join(f1).map(|(a0, a1)| a0 + a1).map_err(|_| ()).boxed()
//!     }
//! }
//!
//! # fn main() {
//! // Creates an executor instance.
//! let mut executor = ThreadPoolExecutor::new().unwrap();
//!
//! // Creates a future which will calculate the fibonacchi number of `10`.
//! let input_number = 10;
//! let future = fibonacci(input_number, executor.handle());
//!
//! // Spawns and executes the future (fiber).
//! let monitor = executor.spawn_monitor(future);
//! let answer = executor.run_fiber(monitor).unwrap();
//!
//! // Checkes the answer.
//! assert_eq!(answer, Ok(55));
//! # }
//! ```
//!
//! ### TCP Echo Server
//!
//! An example of TCP echo server listening at the address "127.0.0.1:3000":
//!
//! ```no_run
//! # extern crate fibers;
//! # extern crate futures;
//! # extern crate handy_async;
//! use std::io;
//! use fibers::{Spawn, Executor, ThreadPoolExecutor};
//! use fibers::net::TcpListener;
//! use futures::{Future, Stream};
//! use handy_async::io::{AsyncWrite, ReadFrom};
//! use handy_async::pattern::AllowPartial;
//!
//! # fn main() {
//! let server_addr = "127.0.0.1:3000".parse().expect("Invalid TCP bind address");
//!
//! let mut executor = ThreadPoolExecutor::new().expect("Cannot create Executor");
//! let handle0 = executor.handle();
//! let monitor = executor.spawn_monitor(TcpListener::bind(server_addr)
//!     .and_then(move |listener| {
//!         println!("# Start listening: {}: ", server_addr);
//!
//!         // Creates a stream of incoming TCP client sockets
//!         listener.incoming().for_each(move |(client, addr)| {
//!             // New client is connected.
//!             println!("# CONNECTED: {}", addr);
//!             let handle1 = handle0.clone();
//!
//!             // Spawns a fiber to handle the client.
//!             handle0.spawn(client.and_then(move |client| {
//!                     // For simplicity, splits reading process and
//!                     // writing process into differrent fibers.
//!                     let (reader, writer) = (client.clone(), client);
//!                     let (tx, rx) = fibers::sync::mpsc::channel();
//!
//!                     // Spawns a fiber for the writer side.
//!                     // When a message is arrived in `rx`,
//!                     // this fiber sends it back to the client.
//!                     handle1.spawn(rx.map_err(|_| -> io::Error { unreachable!() })
//!                         .fold(writer, |writer, buf: Vec<u8>| {
//!                             println!("# SEND: {} bytes", buf.len());
//!                             writer.async_write_all(buf)
//!                                   .map(|(w, _)| w)
//!                                   .map_err(|e| e.into_error())
//!                         })
//!                         .then(|r| {
//!                             println!("# Writer finished: {:?}", r);
//!                             Ok(())
//!                         }));
//!
//!                     // The reader side is executed in the current fiber.
//!                     let stream = vec![0;1024].allow_partial().into_stream(reader);
//!                     stream.map_err(|e| e.into_error())
//!                         .fold(tx, |tx, (mut buf, len)| {
//!                             buf.truncate(len);
//!                             println!("# RECV: {} bytes", buf.len());
//!
//!                             // Sends received  to the writer half.
//!                             tx.send(buf).expect("Cannot send");
//!                             Ok(tx) as io::Result<_>
//!                         })
//!                 })
//!                 .then(|r| {
//!                     println!("# Client finished: {:?}", r);
//!                     Ok(())
//!                 }));
//!             Ok(())
//!         })
//!     }));
//! let result = executor.run_fiber(monitor).expect("Execution failed");
//! println!("# Listener finished: {:?}", result);
//! # }
//! ```
#![warn(missing_docs)]

extern crate mio;
extern crate futures;
extern crate splay_tree;
extern crate num_cpus;
extern crate handy_async;
extern crate nbchan;

macro_rules! assert_some {
    ($e:expr) => {
        $e.expect(&format!("[{}:{}] {:?} must be a Some(..)",
                           file!(), line!(), stringify!($e)))
    }
}

macro_rules! assert_ok {
    ($e:expr) => {
        $e.expect(&format!("[{}:{}] {:?} must be a Ok(..)",
                           file!(), line!(), stringify!($e)))
    }
}

#[doc(inline)]
pub use self::executor::{Executor, InPlaceExecutor, ThreadPoolExecutor};

#[doc(inline)]
pub use self::fiber::{Spawn, BoxSpawn};

pub mod io;
pub mod net;
pub mod sync;
pub mod time;
pub mod fiber;
pub mod executor;

mod internal;

#[cfg(test)]
mod tests {
    #[test]
    fn it_works() {}
}