asyncio/async/
strand.rs

1use unsafe_cell::UnsafeRefCell;
2use error::ErrCode;
3use core::{IoContext, AsIoContext, ThreadIoContext, FnOp, Upcast};
4use async::{Sender, NullReceiver, Operation, WrappedHandler, Handler};
5
6use std::cell::UnsafeCell;
7use std::marker::PhantomData;
8use std::ops::{Deref, DerefMut};
9use std::sync::{Arc, Mutex};
10use std::collections::VecDeque;
11
12trait FnBox<T> {
13    fn call_box(self: Box<Self>, ctx: &IoContext, this: &mut ThreadIoContext, data: &StrandData<T>);
14}
15
16impl<T, F: FnOnce(Strand<T>, &mut ThreadIoContext)> FnBox<T> for F {
17    fn call_box(self: Box<Self>, ctx: &IoContext, this: &mut ThreadIoContext, data: &StrandData<T>) {
18        (*self)(Strand { ctx: ctx, data: data }, this)
19    }
20}
21
22type Function<T> = Box<FnBox<T>>;
23
24struct StrandQueue<T> {
25    locked: bool,
26    queue: VecDeque<Function<T>>,
27}
28
29pub struct StrandData<T> {
30    mutex: Arc<(Mutex<StrandQueue<T>>, UnsafeCell<T>)>,
31}
32unsafe impl<T> Send for StrandData<T> {
33}
34
35impl<T> StrandData<T> {
36    pub fn run<F>(&self, ctx: &IoContext, this: &mut ThreadIoContext, func: F)
37        where F: FnOnce(Strand<T>, &mut ThreadIoContext) + Send + 'static
38    {
39        {
40            let mut owner = self.mutex.0.lock().unwrap();
41            if owner.locked {
42                owner.queue.push_back(Box::new(func));
43                return;
44            }
45            owner.locked = true;
46        }
47
48        func(Strand { ctx: ctx, data: self }, this);
49
50        while let Some(func) = {
51            let mut owner = self.mutex.0.lock().unwrap();
52            if let Some(func) = owner.queue.pop_front() {
53                Some(func)
54            } else {
55                owner.locked = false;
56                None
57            }
58        } {
59            func.call_box(ctx, this, self);
60        }
61    }
62}
63
64impl<T> Clone for StrandData<T> {
65    fn clone(&self) -> Self {
66        StrandData {
67            mutex: self.mutex.clone()
68        }
69    }
70}
71
72/// Provides serialized data and handler execution.
73pub struct Strand<'a, T: 'a> {
74    ctx: &'a IoContext,
75    data: &'a StrandData<T>,
76}
77
78impl<'a, T> Strand<'a, T> {
79    /// Returns a `&mut T` to the memory safely.
80    pub fn get(&self) -> &mut T {
81        unsafe { &mut *self.data.mutex.1.get() }
82    }
83
84    /// Request the strand to invoke the given handler.
85    pub fn dispatch<F>(&self, func: F)
86        where F: FnOnce(Strand<T>) + Send + 'static
87    {
88        func(Strand { ctx: self.ctx, data: self.data })
89    }
90
91    /// Request the strand to invoke the given handler and return immediately.
92    pub fn post<F>(&self, func: F)
93        where F: FnOnce(Strand<T>) + Send + 'static
94    {
95        let mut owner = self.data.mutex.0.lock().unwrap();
96        owner.queue.push_back(Box::new(move|st: Strand<T>, _: &mut ThreadIoContext| func(st)))
97    }
98
99    /// Provides a `Strand` handler to asynchronous operation.
100    ///
101    /// The StrandHandler has trait the `Handler`, that type of `Handler::Output` is `()`.
102    pub fn wrap<F, R, E>(&self, handler: F) -> StrandHandler<T, F, R, E>
103        where F: FnOnce(Strand<T>, Result<R, E>) + Send + 'static,
104              R: Send + 'static,
105    {
106        StrandHandler {
107            data: self.data.clone(),
108            handler: handler,
109            _marker: PhantomData,
110        }
111    }
112}
113
114unsafe impl<'a, T> AsIoContext for Strand<'a, T> {
115    fn as_ctx(&self) -> &IoContext {
116        self.ctx
117    }
118}
119
120impl<'a, T> Deref for Strand<'a, T> {
121    type Target = T;
122
123    fn deref(&self) -> &T {
124        unsafe { &*self.data.mutex.1.get() }
125    }
126}
127
128impl<'a, T> DerefMut for Strand<'a, T> {
129    fn deref_mut(&mut self) -> &mut T {
130        unsafe { &mut *self.data.mutex.1.get() }
131    }
132}
133
134/// Provides immutable data and handler execution.
135pub struct StrandImmutable<'a, T> {
136    ctx: &'a IoContext,
137    data: StrandData<T>,
138}
139
140impl<'a, T: 'static> StrandImmutable<'a, T> {
141    /// Request the strand to invoke the given handler.
142    pub fn dispatch<F>(&self, func: F)
143        where F: FnOnce(Strand<T>) + Send + 'static
144    {
145        let data = self.data.clone();
146        self.ctx.do_dispatch(move|ctx: &IoContext, this: &mut ThreadIoContext| {
147            data.run(ctx, this, move|st: Strand<T>, _: &mut ThreadIoContext| {
148                func(st)
149            })
150        })
151    }
152
153    /// Request the strand to invoke the given handler and return immediately.
154    pub fn post<F>(&self, func: F)
155        where F: FnOnce(Strand<T>) + Send + 'static
156    {
157        let data = self.data.clone();
158        self.ctx.do_post(move|ctx: &IoContext, this: &mut ThreadIoContext| {
159            data.run(ctx, this, move|st: Strand<T>, _: &mut ThreadIoContext| {
160                func(st)
161            })
162        })
163    }
164
165    pub unsafe fn as_mut(&'a self) -> Strand<'a, T> {
166        Strand {
167            ctx: self.ctx,
168            data: &self.data,
169        }
170    }
171}
172
173unsafe impl<'a, T> AsIoContext for StrandImmutable<'a, T> {
174    fn as_ctx(&self) -> &IoContext {
175        self.ctx
176    }
177}
178
179impl<'a, T> Deref for StrandImmutable<'a, T> {
180    type Target = T;
181
182    fn deref(&self) -> &Self::Target {
183        unsafe { &*self.data.mutex.1.get() }
184    }
185}
186
187/// The binding Strand handler.
188pub struct StrandHandler<T, F, R, E> {
189    pub data: StrandData<T>,
190    handler: F,
191    _marker: PhantomData<(R, E)>,
192}
193
194impl<T, F, R, E> StrandHandler<T, F, R, E>
195    where T: 'static,
196          F: FnOnce(Strand<T>, Result<R, E>) + Send + 'static,
197          R: Send + 'static,
198          E: Send + 'static,
199{
200    pub fn send(self, ctx: &IoContext, res: Result<R, E>) {
201        let StrandHandler { data, handler, _marker } = self;
202        handler(Strand { ctx: ctx, data: &data }, res)
203    }
204}
205
206impl<T, F, R, E> Handler<R, E> for StrandHandler<T, F, R, E>
207    where T: 'static,
208          F: FnOnce(Strand<T>, Result<R, E>) + Send + 'static,
209          R: Send + 'static,
210          E: Send + 'static,
211{
212    type Output = ();
213
214    type Receiver = NullReceiver;
215
216    fn channel<G>(self, op: G) -> (Operation<R, E, G>, Self::Receiver)
217        where G: WrappedHandler<R, E> + Send + 'static
218    {
219        (Box::new((self, op)), NullReceiver)
220    }
221
222    fn result(self, ctx: &IoContext, res: Result<R, E>) -> Self::Output {
223        let StrandHandler { data, handler, _marker } = self;
224        handler(Strand { ctx: ctx, data: &data }, res)
225    }
226}
227
228impl<T, F, R, E, G> Upcast<FnOp + Send> for (StrandHandler<T, F, R, E>, G)
229    where T: 'static,
230          F: FnOnce(Strand<T>, Result<R, E>) + Send + 'static,
231          R: Send + 'static,
232          E: Send + 'static,
233          G: WrappedHandler<R, E> + Send + 'static,
234{
235    fn upcast(self: Box<Self>) -> Box<FnOp + Send> {
236        self
237    }
238}
239
240impl<T, F, R, E, G> Sender<R, E, G> for (StrandHandler<T, F, R, E>, G)
241    where T: 'static,
242          F: FnOnce(Strand<T>, Result<R, E>) + Send + 'static,
243          R: Send + 'static,
244          E: Send + 'static,
245          G: WrappedHandler<R, E> + Send + 'static,
246{
247    fn send(self: Box<Self>, ctx: &IoContext, res: Result<R, E>) {
248        ctx.post(move|ctx| self.0.send(ctx, res))
249    }
250
251    fn as_self(&self) -> &G {
252        &self.1
253    }
254
255    fn as_mut_self(&mut self) -> &mut G {
256        &mut self.1
257    }
258}
259
260impl<T, F, R, E, G> FnOp for (StrandHandler<T, F, R, E>, G)
261    where T: 'static,
262          F: FnOnce(Strand<T>, Result<R, E>) + Send + 'static,
263          R: Send + 'static,
264          E: Send + 'static,
265          G: WrappedHandler<R, E> + Send + 'static,
266{
267    fn call_op(self: Box<Self>, ctx: &IoContext, this: &mut ThreadIoContext, ec: ErrCode) {
268        self.0.data.clone().run(ctx, this, move |st, this| {
269            let mut g = UnsafeRefCell::new(&self.1);
270            unsafe { g.as_mut() }.perform(st.as_ctx(), this, ec, self)
271        })
272    }
273}
274
275pub fn strand_clone<'a, T>(ctx: &'a IoContext, data: &'a StrandData<T>) -> Strand<'a, T> {
276    Strand { ctx: ctx, data: data }
277}
278
279impl IoContext {
280    pub fn strand<'a, T>(ctx: &'a IoContext, data: T) -> StrandImmutable<'a, T> {
281        StrandImmutable {
282            ctx: ctx,
283            data: StrandData {
284                mutex: Arc::new((Mutex::new(StrandQueue {
285                    locked: false,
286                    queue: VecDeque::new(),
287                }), UnsafeCell::new(data)))
288            },
289        }
290    }
291}
292
293#[test]
294fn test_strand() {
295    let ctx = &IoContext::new().unwrap();
296    let st = IoContext::strand(ctx, 0);
297    let mut st = unsafe { st.as_mut() };
298    *st = 1;
299    assert_eq!(*st, 1);
300}
301
302#[test]
303fn test_strand_dispatch() {
304    let ctx = &IoContext::new().unwrap();
305    let st = IoContext::strand(ctx, 0);
306    st.dispatch(|mut st| *st = 1);
307    ctx.run();
308    assert_eq!(*st, 1);
309}
310
311#[test]
312fn test_strand_post() {
313    let ctx = &IoContext::new().unwrap();
314    let st = IoContext::strand(ctx, 0);
315    st.post(|mut st| *st = 1);
316    ctx.run();
317    assert_eq!(*st, 1);
318}