asyncio/core/mod.rs
1use prelude::Protocol;
2use ffi::{RawFd, AsRawFd};
3use error::ErrCode;
4
5use std::io;
6use std::sync::Arc;
7
8pub unsafe trait AsIoContext {
9 fn as_ctx(&self) -> &IoContext;
10}
11
12#[derive(Clone, Debug)]
13pub struct IoContext(Arc<Impl>);
14
15impl IoContext {
16 /// Returns a new `IoContext`.
17 ///
18 /// # Panics
19 /// Panics if too many open files.
20 ///
21 /// # Examples
22 /// ```
23 /// use asyncio::IoContext;
24 ///
25 /// IoContext::new().unwrap();
26 /// ```
27 pub fn new() -> io::Result<IoContext> {
28 Impl::new()
29 }
30
31 /// Requests a process to invoke the given handler.
32 ///
33 /// # Examples
34 /// ```
35 /// use asyncio::IoContext;
36 /// use std::sync::atomic::{Ordering, AtomicUsize, ATOMIC_USIZE_INIT};
37 ///
38 /// static COUNT: AtomicUsize = ATOMIC_USIZE_INIT;
39 ///
40 /// let ctx = IoContext::new().unwrap();
41 /// ctx.dispatch(|_| { COUNT.fetch_add(1, Ordering::SeqCst); });
42 /// ctx.dispatch(|_| { COUNT.fetch_add(1, Ordering::SeqCst); });
43 /// ctx.dispatch(|_| { COUNT.fetch_add(1, Ordering::SeqCst); });
44 /// assert_eq!(COUNT.load(Ordering::Relaxed), 0);
45 ///
46 /// ctx.run();
47 /// assert_eq!(COUNT.load(Ordering::Relaxed), 3);
48 /// ```
49 pub fn dispatch<F>(&self, func: F)
50 where F: FnOnce(&IoContext) + Send + 'static
51 {
52 self.do_dispatch(move|ctx: &IoContext, _: &mut ThreadIoContext| func(ctx))
53 }
54
55 #[doc(hidden)]
56 pub fn do_dispatch<F>(&self, func: F)
57 where F: FnOnce(&IoContext, &mut ThreadIoContext) + Send + 'static
58 {
59 Impl::do_dispatch(self, func)
60 }
61
62 /// Requests a process to invoke the given handler and return immediately.
63 ///
64 /// # Examples
65 /// ```
66 /// use asyncio::IoContext;
67 /// use std::sync::atomic::{Ordering, AtomicUsize, ATOMIC_USIZE_INIT};
68 ///
69 /// static COUNT: AtomicUsize = ATOMIC_USIZE_INIT;
70 ///
71 /// let ctx = IoContext::new().unwrap();
72 /// ctx.post(|_| { COUNT.fetch_add(1, Ordering::SeqCst); });
73 /// ctx.post(|_| { COUNT.fetch_add(1, Ordering::SeqCst); });
74 /// ctx.post(|_| { COUNT.fetch_add(1, Ordering::SeqCst); });
75 /// assert_eq!(COUNT.load(Ordering::Relaxed), 0);
76 ///
77 /// ctx.run();
78 /// assert_eq!(COUNT.load(Ordering::Relaxed), 3);
79 /// ```
80 pub fn post<F>(&self, func: F)
81 where F: FnOnce(&IoContext) + Send + 'static
82 {
83 self.do_post(move|ctx: &IoContext, _: &mut ThreadIoContext| func(ctx))
84 }
85
86 #[doc(hidden)]
87 pub fn do_post<F>(&self, func: F)
88 where F: FnOnce(&IoContext, &mut ThreadIoContext) + Send + 'static
89 {
90 Impl::do_post(self, func)
91 }
92
93 /// Resets a stopped `IoContext`.
94 ///
95 /// # Examples
96 /// ```
97 /// use asyncio::IoContext;
98 ///
99 /// let ctx = IoContext::new().unwrap();
100 /// assert_eq!(ctx.stopped(), false);
101 /// ctx.stop();
102 /// assert_eq!(ctx.stopped(), true);
103 /// ctx.restart();
104 /// assert_eq!(ctx.stopped(), false);
105 /// ```
106 pub fn restart(&self) -> bool {
107 Impl::restart(self)
108 }
109
110 /// Runs all given handlers.
111 ///
112 /// # Examples
113 /// ```
114 /// use asyncio::IoContext;
115 ///
116 /// let ctx = IoContext::new().unwrap();
117 /// ctx.run();
118 /// ```
119 pub fn run(&self) -> usize {
120 Impl::run(self)
121 }
122
123 pub fn run_one(&self) -> usize {
124 Impl::run_one(self)
125 }
126
127 /// Sets a stop request and cancel all of the waiting event in an `IoContext`.
128 ///
129 /// # Examples
130 /// ```
131 /// use asyncio::IoContext;
132 ///
133 /// let ctx = IoContext::new().unwrap();
134 /// ctx.stop();
135 /// ```
136 pub fn stop(&self) {
137 Impl::stop(self)
138 }
139
140 /// Returns true if this has been stopped.
141 ///
142 /// # Examples
143 /// ```
144 /// use asyncio::IoContext;
145 ///
146 /// let ctx = IoContext::new().unwrap();
147 /// assert_eq!(ctx.stopped(), false);
148 /// ctx.stop();
149 /// assert_eq!(ctx.stopped(), true);
150 /// ```
151 pub fn stopped(&self) -> bool {
152 self.0.stopped()
153 }
154
155 pub fn work(ctx: &IoContext) -> IoContextWork {
156 ctx.0.work_started();
157 IoContextWork(ctx.clone())
158 }
159}
160
161unsafe impl AsIoContext for IoContext {
162 fn as_ctx(&self) -> &IoContext {
163 self
164 }
165}
166
167unsafe impl Send for IoContext { }
168
169/// The class to delaying until the stop of `IoContext` is dropped.
170///
171/// # Examples
172/// When dropped the `IoContextWork`, to stop the `IoContext`:
173///
174/// ```
175/// use asyncio::IoContext;
176/// use std::sync::atomic::{Ordering, AtomicUsize, ATOMIC_USIZE_INIT};
177///
178/// static COUNT: AtomicUsize = ATOMIC_USIZE_INIT;
179///
180/// let ctx = &IoContext::new().unwrap();
181/// let mut work = Some(IoContext::work(ctx));
182///
183/// fn count_if_not_stopped(ctx: &IoContext) {
184/// if !ctx.stopped() {
185/// COUNT.fetch_add(1, Ordering::Relaxed);
186/// }
187/// }
188/// ctx.post(count_if_not_stopped);
189/// ctx.post(move |_| work = None); // call IoContext::stop()
190/// ctx.post(count_if_not_stopped);
191/// ctx.run();
192///
193/// assert_eq!(COUNT.load(Ordering::Relaxed), 1);
194/// ```
195///
196/// # Examples
197/// A multithreading example code:
198///
199/// ```rust,no_run
200/// use asyncio::IoContext;
201/// use std::thread;
202/// use std::sync::atomic::{Ordering, AtomicUsize, ATOMIC_USIZE_INIT};
203///
204/// static COUNT: AtomicUsize = ATOMIC_USIZE_INIT;
205///
206/// let ctx = &IoContext::new().unwrap();
207/// let _work = IoContext::work(ctx);
208///
209/// let mut thrds = Vec::new();
210/// for _ in 0..10 {
211/// let ctx = ctx.clone();
212/// thrds.push(thread::spawn(move|| ctx.run()));
213/// }
214///
215/// for _ in 0..100 {
216/// ctx.post(move|ctx| {
217/// if COUNT.fetch_add(1, Ordering::SeqCst) == 99 {
218/// ctx.stop();
219/// }
220/// });
221/// }
222///
223/// ctx.run();
224/// for thrd in thrds {
225/// thrd.join().unwrap();
226/// }
227///
228/// assert_eq!(COUNT.load(Ordering::Relaxed), 100);
229/// ```
230pub struct IoContextWork(IoContext);
231
232impl Drop for IoContextWork {
233 fn drop(&mut self) {
234 (self.0).0.work_finished();
235 self.0.stop();
236 }
237}
238
239unsafe impl AsIoContext for IoContextWork {
240 fn as_ctx(&self) -> &IoContext {
241 &self.0
242 }
243}
244
245pub trait Socket<P: Protocol> : AsIoContext + AsRawFd + Send + 'static {
246 unsafe fn from_raw_fd(&IoContext, pro: P, fd: RawFd) -> Self;
247 fn protocol(&self) -> P;
248}
249
250pub trait Upcast<T: ?Sized> {
251 fn upcast(self: Box<Self>) -> Box<T>;
252}
253
254pub trait FnOp {
255 fn call_op(self: Box<Self>, ctx: &IoContext, this: &mut ThreadIoContext, ec: ErrCode);
256}
257
258type Operation = Box<FnOp + Send>;
259
260mod task_ctx;
261pub use self::task_ctx::{TaskIoContext as Impl, ThreadIoContext, workplace};
262
263mod init;
264pub use self::init::Init;
265
266mod callstack;
267pub use self::callstack::ThreadCallStack;
268
269mod reactor;
270pub use self::reactor::*;
271
272mod interrupter;
273pub use self::interrupter::*;
274
275mod scheduler;
276pub use self::scheduler::*;
277
278#[test]
279fn test_new() {
280 IoContext::new().unwrap();
281}
282
283#[test]
284fn test_run() {
285 let ctx = &IoContext::new().unwrap();
286 ctx.run();
287 assert!(ctx.stopped());
288}
289
290#[test]
291fn test_run_one() {
292 let ctx = &IoContext::new().unwrap();
293 ctx.run_one();
294 assert!(ctx.stopped());
295}
296
297#[test]
298fn test_work() {
299 let ctx = &IoContext::new().unwrap();
300 {
301 let _work = IoContext::work(ctx);
302 }
303 assert!(ctx.stopped());
304}
305
306#[test]
307fn test_multithread_working() {
308 use std::thread;
309 use std::sync::atomic::{Ordering, AtomicUsize, ATOMIC_USIZE_INIT};
310
311 static COUNT: AtomicUsize = ATOMIC_USIZE_INIT;
312
313 let ctx = &IoContext::new().unwrap();
314 let _work = IoContext::work(ctx);
315
316 let mut thrds = Vec::new();
317 for _ in 0..10 {
318 let ctx = ctx.clone();
319 thrds.push(thread::spawn(move|| ctx.run()))
320 }
321
322 for _ in 0..100 {
323 ctx.post(move |ctx| {
324 if COUNT.fetch_add(1, Ordering::SeqCst) == 99 {
325 ctx.stop();
326 }
327 })
328 }
329
330 ctx.run();
331 for thrd in thrds {
332 thrd.join().unwrap();
333 }
334
335 assert_eq!(COUNT.load(Ordering::Relaxed), 100);
336}