asyncio/core/
mod.rs

1use prelude::Protocol;
2use ffi::{RawFd, AsRawFd};
3use error::ErrCode;
4
5use std::io;
6use std::sync::Arc;
7
8pub unsafe trait AsIoContext {
9    fn as_ctx(&self) -> &IoContext;
10}
11
12#[derive(Clone, Debug)]
13pub struct IoContext(Arc<Impl>);
14
15impl IoContext {
16    /// Returns a new `IoContext`.
17    ///
18    /// # Panics
19    /// Panics if too many open files.
20    ///
21    /// # Examples
22    /// ```
23    /// use asyncio::IoContext;
24    ///
25    /// IoContext::new().unwrap();
26    /// ```
27    pub fn new() -> io::Result<IoContext> {
28        Impl::new()
29    }
30
31    /// Requests a process to invoke the given handler.
32    ///
33    /// # Examples
34    /// ```
35    /// use asyncio::IoContext;
36    /// use std::sync::atomic::{Ordering, AtomicUsize, ATOMIC_USIZE_INIT};
37    ///
38    /// static COUNT: AtomicUsize = ATOMIC_USIZE_INIT;
39    ///
40    /// let ctx = IoContext::new().unwrap();
41    /// ctx.dispatch(|_| { COUNT.fetch_add(1, Ordering::SeqCst); });
42    /// ctx.dispatch(|_| { COUNT.fetch_add(1, Ordering::SeqCst); });
43    /// ctx.dispatch(|_| { COUNT.fetch_add(1, Ordering::SeqCst); });
44    /// assert_eq!(COUNT.load(Ordering::Relaxed), 0);
45    ///
46    /// ctx.run();
47    /// assert_eq!(COUNT.load(Ordering::Relaxed), 3);
48    /// ```
49    pub fn dispatch<F>(&self, func: F)
50        where F: FnOnce(&IoContext) + Send + 'static
51    {
52        self.do_dispatch(move|ctx: &IoContext, _: &mut ThreadIoContext| func(ctx))
53    }
54
55    #[doc(hidden)]
56    pub fn do_dispatch<F>(&self, func: F)
57        where F: FnOnce(&IoContext, &mut ThreadIoContext) + Send + 'static
58    {
59        Impl::do_dispatch(self, func)
60    }
61
62    /// Requests a process to invoke the given handler and return immediately.
63    ///
64    /// # Examples
65    /// ```
66    /// use asyncio::IoContext;
67    /// use std::sync::atomic::{Ordering, AtomicUsize, ATOMIC_USIZE_INIT};
68    ///
69    /// static COUNT: AtomicUsize = ATOMIC_USIZE_INIT;
70    ///
71    /// let ctx = IoContext::new().unwrap();
72    /// ctx.post(|_| { COUNT.fetch_add(1, Ordering::SeqCst); });
73    /// ctx.post(|_| { COUNT.fetch_add(1, Ordering::SeqCst); });
74    /// ctx.post(|_| { COUNT.fetch_add(1, Ordering::SeqCst); });
75    /// assert_eq!(COUNT.load(Ordering::Relaxed), 0);
76    ///
77    /// ctx.run();
78    /// assert_eq!(COUNT.load(Ordering::Relaxed), 3);
79    /// ```
80    pub fn post<F>(&self, func: F)
81        where F: FnOnce(&IoContext) + Send + 'static
82    {
83        self.do_post(move|ctx: &IoContext, _: &mut ThreadIoContext| func(ctx))
84    }
85
86    #[doc(hidden)]
87    pub fn do_post<F>(&self, func: F)
88        where F: FnOnce(&IoContext, &mut ThreadIoContext) + Send + 'static
89    {
90        Impl::do_post(self, func)
91    }
92
93    /// Resets a stopped `IoContext`.
94    ///
95    /// # Examples
96    /// ```
97    /// use asyncio::IoContext;
98    ///
99    /// let ctx = IoContext::new().unwrap();
100    /// assert_eq!(ctx.stopped(), false);
101    /// ctx.stop();
102    /// assert_eq!(ctx.stopped(), true);
103    /// ctx.restart();
104    /// assert_eq!(ctx.stopped(), false);
105    /// ```
106    pub fn restart(&self) -> bool {
107        Impl::restart(self)
108    }
109
110    /// Runs all given handlers.
111    ///
112    /// # Examples
113    /// ```
114    /// use asyncio::IoContext;
115    ///
116    /// let ctx = IoContext::new().unwrap();
117    /// ctx.run();
118    /// ```
119    pub fn run(&self) -> usize {
120        Impl::run(self)
121    }
122
123    pub fn run_one(&self) -> usize {
124        Impl::run_one(self)
125    }
126
127    /// Sets a stop request and cancel all of the waiting event in an `IoContext`.
128    ///
129    /// # Examples
130    /// ```
131    /// use asyncio::IoContext;
132    ///
133    /// let ctx = IoContext::new().unwrap();
134    /// ctx.stop();
135    /// ```
136    pub fn stop(&self) {
137        Impl::stop(self)
138    }
139
140    /// Returns true if this has been stopped.
141    ///
142    /// # Examples
143    /// ```
144    /// use asyncio::IoContext;
145    ///
146    /// let ctx = IoContext::new().unwrap();
147    /// assert_eq!(ctx.stopped(), false);
148    /// ctx.stop();
149    /// assert_eq!(ctx.stopped(), true);
150    /// ```
151    pub fn stopped(&self) -> bool {
152        self.0.stopped()
153    }
154
155    pub fn work(ctx: &IoContext) -> IoContextWork {
156        ctx.0.work_started();
157        IoContextWork(ctx.clone())
158    }
159}
160
161unsafe impl AsIoContext for IoContext {
162    fn as_ctx(&self) -> &IoContext {
163        self
164    }
165}
166
167unsafe impl Send for IoContext { }
168
169/// The class to delaying until the stop of `IoContext` is dropped.
170///
171/// # Examples
172/// When dropped the `IoContextWork`, to stop the `IoContext`:
173///
174/// ```
175/// use asyncio::IoContext;
176/// use std::sync::atomic::{Ordering, AtomicUsize, ATOMIC_USIZE_INIT};
177///
178/// static COUNT: AtomicUsize = ATOMIC_USIZE_INIT;
179///
180/// let ctx = &IoContext::new().unwrap();
181/// let mut work = Some(IoContext::work(ctx));
182///
183/// fn count_if_not_stopped(ctx: &IoContext) {
184///   if !ctx.stopped() {
185///     COUNT.fetch_add(1, Ordering::Relaxed);
186///   }
187/// }
188/// ctx.post(count_if_not_stopped);
189/// ctx.post(move |_| work = None);  // call IoContext::stop()
190/// ctx.post(count_if_not_stopped);
191/// ctx.run();
192///
193/// assert_eq!(COUNT.load(Ordering::Relaxed), 1);
194/// ```
195///
196/// # Examples
197/// A multithreading example code:
198///
199/// ```rust,no_run
200/// use asyncio::IoContext;
201/// use std::thread;
202/// use std::sync::atomic::{Ordering, AtomicUsize, ATOMIC_USIZE_INIT};
203///
204/// static COUNT: AtomicUsize = ATOMIC_USIZE_INIT;
205///
206/// let ctx = &IoContext::new().unwrap();
207/// let _work = IoContext::work(ctx);
208///
209/// let mut thrds = Vec::new();
210/// for _ in 0..10 {
211///   let ctx = ctx.clone();
212///   thrds.push(thread::spawn(move|| ctx.run()));
213/// }
214///
215/// for _ in 0..100 {
216///   ctx.post(move|ctx| {
217///     if COUNT.fetch_add(1, Ordering::SeqCst) == 99 {
218///       ctx.stop();
219///     }
220///   });
221/// }
222///
223/// ctx.run();
224/// for thrd in thrds {
225///   thrd.join().unwrap();
226/// }
227///
228/// assert_eq!(COUNT.load(Ordering::Relaxed), 100);
229/// ```
230pub struct IoContextWork(IoContext);
231
232impl Drop for IoContextWork {
233    fn drop(&mut self) {
234        (self.0).0.work_finished();
235        self.0.stop();
236    }
237}
238
239unsafe impl AsIoContext for IoContextWork {
240    fn as_ctx(&self) -> &IoContext {
241        &self.0
242    }
243}
244
245pub trait Socket<P: Protocol> : AsIoContext + AsRawFd + Send + 'static {
246    unsafe fn from_raw_fd(&IoContext, pro: P, fd: RawFd) -> Self;
247    fn protocol(&self) -> P;
248}
249
250pub trait Upcast<T: ?Sized>  {
251    fn upcast(self: Box<Self>) -> Box<T>;
252}
253
254pub trait FnOp {
255    fn call_op(self: Box<Self>, ctx: &IoContext, this: &mut ThreadIoContext, ec: ErrCode);
256}
257
258type Operation = Box<FnOp + Send>;
259
260mod task_ctx;
261pub use self::task_ctx::{TaskIoContext as Impl, ThreadIoContext, workplace};
262
263mod init;
264pub use self::init::Init;
265
266mod callstack;
267pub use self::callstack::ThreadCallStack;
268
269mod reactor;
270pub use self::reactor::*;
271
272mod interrupter;
273pub use self::interrupter::*;
274
275mod scheduler;
276pub use self::scheduler::*;
277
278#[test]
279fn test_new() {
280    IoContext::new().unwrap();
281}
282
283#[test]
284fn test_run() {
285    let ctx = &IoContext::new().unwrap();
286    ctx.run();
287    assert!(ctx.stopped());
288}
289
290#[test]
291fn test_run_one() {
292    let ctx = &IoContext::new().unwrap();
293    ctx.run_one();
294    assert!(ctx.stopped());
295}
296
297#[test]
298fn test_work() {
299    let ctx = &IoContext::new().unwrap();
300    {
301        let _work = IoContext::work(ctx);
302    }
303    assert!(ctx.stopped());
304}
305
306#[test]
307fn test_multithread_working() {
308    use std::thread;
309    use std::sync::atomic::{Ordering, AtomicUsize, ATOMIC_USIZE_INIT};
310
311    static COUNT: AtomicUsize = ATOMIC_USIZE_INIT;
312
313    let ctx = &IoContext::new().unwrap();
314    let _work = IoContext::work(ctx);
315
316    let mut thrds = Vec::new();
317    for _ in 0..10 {
318        let ctx = ctx.clone();
319        thrds.push(thread::spawn(move|| ctx.run()))
320    }
321
322    for _ in 0..100 {
323        ctx.post(move |ctx| {
324            if COUNT.fetch_add(1, Ordering::SeqCst) == 99 {
325                ctx.stop();
326            }
327        })
328    }
329
330    ctx.run();
331    for thrd in thrds {
332        thrd.join().unwrap();
333    }
334
335    assert_eq!(COUNT.load(Ordering::Relaxed), 100);
336}