1use unsafe_cell::UnsafeRefCell;
2use error::ErrCode;
3use core::{IoContext, AsIoContext, ThreadIoContext, FnOp, Upcast};
4use async::{Sender, NullReceiver, Operation, WrappedHandler, Handler};
5
6use std::cell::UnsafeCell;
7use std::marker::PhantomData;
8use std::ops::{Deref, DerefMut};
9use std::sync::{Arc, Mutex};
10use std::collections::VecDeque;
11
12trait FnBox<T> {
13 fn call_box(self: Box<Self>, ctx: &IoContext, this: &mut ThreadIoContext, data: &StrandData<T>);
14}
15
16impl<T, F: FnOnce(Strand<T>, &mut ThreadIoContext)> FnBox<T> for F {
17 fn call_box(self: Box<Self>, ctx: &IoContext, this: &mut ThreadIoContext, data: &StrandData<T>) {
18 (*self)(Strand { ctx: ctx, data: data }, this)
19 }
20}
21
22type Function<T> = Box<FnBox<T>>;
23
24struct StrandQueue<T> {
25 locked: bool,
26 queue: VecDeque<Function<T>>,
27}
28
29pub struct StrandData<T> {
30 mutex: Arc<(Mutex<StrandQueue<T>>, UnsafeCell<T>)>,
31}
32unsafe impl<T> Send for StrandData<T> {
33}
34
35impl<T> StrandData<T> {
36 pub fn run<F>(&self, ctx: &IoContext, this: &mut ThreadIoContext, func: F)
37 where F: FnOnce(Strand<T>, &mut ThreadIoContext) + Send + 'static
38 {
39 {
40 let mut owner = self.mutex.0.lock().unwrap();
41 if owner.locked {
42 owner.queue.push_back(Box::new(func));
43 return;
44 }
45 owner.locked = true;
46 }
47
48 func(Strand { ctx: ctx, data: self }, this);
49
50 while let Some(func) = {
51 let mut owner = self.mutex.0.lock().unwrap();
52 if let Some(func) = owner.queue.pop_front() {
53 Some(func)
54 } else {
55 owner.locked = false;
56 None
57 }
58 } {
59 func.call_box(ctx, this, self);
60 }
61 }
62}
63
64impl<T> Clone for StrandData<T> {
65 fn clone(&self) -> Self {
66 StrandData {
67 mutex: self.mutex.clone()
68 }
69 }
70}
71
72pub struct Strand<'a, T: 'a> {
74 ctx: &'a IoContext,
75 data: &'a StrandData<T>,
76}
77
78impl<'a, T> Strand<'a, T> {
79 pub fn get(&self) -> &mut T {
81 unsafe { &mut *self.data.mutex.1.get() }
82 }
83
84 pub fn dispatch<F>(&self, func: F)
86 where F: FnOnce(Strand<T>) + Send + 'static
87 {
88 func(Strand { ctx: self.ctx, data: self.data })
89 }
90
91 pub fn post<F>(&self, func: F)
93 where F: FnOnce(Strand<T>) + Send + 'static
94 {
95 let mut owner = self.data.mutex.0.lock().unwrap();
96 owner.queue.push_back(Box::new(move|st: Strand<T>, _: &mut ThreadIoContext| func(st)))
97 }
98
99 pub fn wrap<F, R, E>(&self, handler: F) -> StrandHandler<T, F, R, E>
103 where F: FnOnce(Strand<T>, Result<R, E>) + Send + 'static,
104 R: Send + 'static,
105 {
106 StrandHandler {
107 data: self.data.clone(),
108 handler: handler,
109 _marker: PhantomData,
110 }
111 }
112}
113
114unsafe impl<'a, T> AsIoContext for Strand<'a, T> {
115 fn as_ctx(&self) -> &IoContext {
116 self.ctx
117 }
118}
119
120impl<'a, T> Deref for Strand<'a, T> {
121 type Target = T;
122
123 fn deref(&self) -> &T {
124 unsafe { &*self.data.mutex.1.get() }
125 }
126}
127
128impl<'a, T> DerefMut for Strand<'a, T> {
129 fn deref_mut(&mut self) -> &mut T {
130 unsafe { &mut *self.data.mutex.1.get() }
131 }
132}
133
134pub struct StrandImmutable<'a, T> {
136 ctx: &'a IoContext,
137 data: StrandData<T>,
138}
139
140impl<'a, T: 'static> StrandImmutable<'a, T> {
141 pub fn dispatch<F>(&self, func: F)
143 where F: FnOnce(Strand<T>) + Send + 'static
144 {
145 let data = self.data.clone();
146 self.ctx.do_dispatch(move|ctx: &IoContext, this: &mut ThreadIoContext| {
147 data.run(ctx, this, move|st: Strand<T>, _: &mut ThreadIoContext| {
148 func(st)
149 })
150 })
151 }
152
153 pub fn post<F>(&self, func: F)
155 where F: FnOnce(Strand<T>) + Send + 'static
156 {
157 let data = self.data.clone();
158 self.ctx.do_post(move|ctx: &IoContext, this: &mut ThreadIoContext| {
159 data.run(ctx, this, move|st: Strand<T>, _: &mut ThreadIoContext| {
160 func(st)
161 })
162 })
163 }
164
165 pub unsafe fn as_mut(&'a self) -> Strand<'a, T> {
166 Strand {
167 ctx: self.ctx,
168 data: &self.data,
169 }
170 }
171}
172
173unsafe impl<'a, T> AsIoContext for StrandImmutable<'a, T> {
174 fn as_ctx(&self) -> &IoContext {
175 self.ctx
176 }
177}
178
179impl<'a, T> Deref for StrandImmutable<'a, T> {
180 type Target = T;
181
182 fn deref(&self) -> &Self::Target {
183 unsafe { &*self.data.mutex.1.get() }
184 }
185}
186
187pub struct StrandHandler<T, F, R, E> {
189 pub data: StrandData<T>,
190 handler: F,
191 _marker: PhantomData<(R, E)>,
192}
193
194impl<T, F, R, E> StrandHandler<T, F, R, E>
195 where T: 'static,
196 F: FnOnce(Strand<T>, Result<R, E>) + Send + 'static,
197 R: Send + 'static,
198 E: Send + 'static,
199{
200 pub fn send(self, ctx: &IoContext, res: Result<R, E>) {
201 let StrandHandler { data, handler, _marker } = self;
202 handler(Strand { ctx: ctx, data: &data }, res)
203 }
204}
205
206impl<T, F, R, E> Handler<R, E> for StrandHandler<T, F, R, E>
207 where T: 'static,
208 F: FnOnce(Strand<T>, Result<R, E>) + Send + 'static,
209 R: Send + 'static,
210 E: Send + 'static,
211{
212 type Output = ();
213
214 type Receiver = NullReceiver;
215
216 fn channel<G>(self, op: G) -> (Operation<R, E, G>, Self::Receiver)
217 where G: WrappedHandler<R, E> + Send + 'static
218 {
219 (Box::new((self, op)), NullReceiver)
220 }
221
222 fn result(self, ctx: &IoContext, res: Result<R, E>) -> Self::Output {
223 let StrandHandler { data, handler, _marker } = self;
224 handler(Strand { ctx: ctx, data: &data }, res)
225 }
226}
227
228impl<T, F, R, E, G> Upcast<FnOp + Send> for (StrandHandler<T, F, R, E>, G)
229 where T: 'static,
230 F: FnOnce(Strand<T>, Result<R, E>) + Send + 'static,
231 R: Send + 'static,
232 E: Send + 'static,
233 G: WrappedHandler<R, E> + Send + 'static,
234{
235 fn upcast(self: Box<Self>) -> Box<FnOp + Send> {
236 self
237 }
238}
239
240impl<T, F, R, E, G> Sender<R, E, G> for (StrandHandler<T, F, R, E>, G)
241 where T: 'static,
242 F: FnOnce(Strand<T>, Result<R, E>) + Send + 'static,
243 R: Send + 'static,
244 E: Send + 'static,
245 G: WrappedHandler<R, E> + Send + 'static,
246{
247 fn send(self: Box<Self>, ctx: &IoContext, res: Result<R, E>) {
248 ctx.post(move|ctx| self.0.send(ctx, res))
249 }
250
251 fn as_self(&self) -> &G {
252 &self.1
253 }
254
255 fn as_mut_self(&mut self) -> &mut G {
256 &mut self.1
257 }
258}
259
260impl<T, F, R, E, G> FnOp for (StrandHandler<T, F, R, E>, G)
261 where T: 'static,
262 F: FnOnce(Strand<T>, Result<R, E>) + Send + 'static,
263 R: Send + 'static,
264 E: Send + 'static,
265 G: WrappedHandler<R, E> + Send + 'static,
266{
267 fn call_op(self: Box<Self>, ctx: &IoContext, this: &mut ThreadIoContext, ec: ErrCode) {
268 self.0.data.clone().run(ctx, this, move |st, this| {
269 let mut g = UnsafeRefCell::new(&self.1);
270 unsafe { g.as_mut() }.perform(st.as_ctx(), this, ec, self)
271 })
272 }
273}
274
275pub fn strand_clone<'a, T>(ctx: &'a IoContext, data: &'a StrandData<T>) -> Strand<'a, T> {
276 Strand { ctx: ctx, data: data }
277}
278
279impl IoContext {
280 pub fn strand<'a, T>(ctx: &'a IoContext, data: T) -> StrandImmutable<'a, T> {
281 StrandImmutable {
282 ctx: ctx,
283 data: StrandData {
284 mutex: Arc::new((Mutex::new(StrandQueue {
285 locked: false,
286 queue: VecDeque::new(),
287 }), UnsafeCell::new(data)))
288 },
289 }
290 }
291}
292
293#[test]
294fn test_strand() {
295 let ctx = &IoContext::new().unwrap();
296 let st = IoContext::strand(ctx, 0);
297 let mut st = unsafe { st.as_mut() };
298 *st = 1;
299 assert_eq!(*st, 1);
300}
301
302#[test]
303fn test_strand_dispatch() {
304 let ctx = &IoContext::new().unwrap();
305 let st = IoContext::strand(ctx, 0);
306 st.dispatch(|mut st| *st = 1);
307 ctx.run();
308 assert_eq!(*st, 1);
309}
310
311#[test]
312fn test_strand_post() {
313 let ctx = &IoContext::new().unwrap();
314 let st = IoContext::strand(ctx, 0);
315 st.post(|mut st| *st = 1);
316 ctx.run();
317 assert_eq!(*st, 1);
318}