1use unsafe_cell::UnsafeRefCell;
2use error::ErrCode;
3use core::{IoContext, AsIoContext, ThreadIoContext, FnOp, Upcast};
4use async::{Handler, WrappedHandler, Receiver, Sender, Operation};
5use async::strand::{StrandData, Strand, StrandImmutable, StrandHandler, strand_clone};
6
7use std::marker::PhantomData;
8use context::{Context, Transfer};
9use context::stack::ProtectedFixedSizeStack;
10
11trait FnBox {
12 fn call_box(self: Box<Self>, Coroutine);
13}
14
15impl<F: FnOnce(Coroutine)> FnBox for F {
16 fn call_box(self: Box<Self>, co: Coroutine) {
17 (*self)(co)
18 }
19}
20
21struct InitData {
22 stack: ProtectedFixedSizeStack,
23 ctx: IoContext,
24 func: Box<FnBox>,
25}
26
27extern "C" fn coro_entry(t: Transfer) -> ! {
28 let InitData { stack, ctx, func } = unsafe {
29 let data_opt_ref = &mut *(t.data as *mut Option<InitData>);
30 data_opt_ref.take().unwrap()
31 };
32
33 let context = {
34 let ctx = ctx;
35 let coro = IoContext::strand(&ctx, Some(t.context));
36 let mut data = unsafe { coro.as_mut() };
37 let Transfer { context, data:_ } = data.take().unwrap().resume(&coro as *const _ as usize);
38 *data = Some(context);
39
40 func.call_box(Coroutine(&data));
41 data.take().unwrap()
42 };
43
44 let mut stack_opt = Some(stack);
45 context.resume_ontop(&mut stack_opt as *mut _ as usize, coro_exit);
46
47 unreachable!();
48}
49
50extern "C" fn coro_exit(t: Transfer) -> Transfer {
51 unsafe {
52 let stack_opt_ref = &mut *(t.data as *mut Option<ProtectedFixedSizeStack>);
54 let _ = stack_opt_ref.take().unwrap();
55 }
56 t
57}
58
59fn coro_receiver<R: Send + 'static>(mut coro: Strand<Option<Context>>) -> R {
60 let Transfer { context, data } = coro.take().unwrap().resume(0);
61 *coro = Some(context);
62
63 let data_opt = unsafe { &mut *(data as *mut Option<R>) };
64 data_opt.take().unwrap()
65}
66
67fn coro_sender<R: Send + 'static>(mut coro: Strand<Option<Context>>, data: R) {
68 let mut data_opt = Some(data);
69 let Transfer { context, data } = coro.take().unwrap().resume(&mut data_opt as *mut _ as usize);
70 if data == 0 {
71 *coro = Some(context);
72 }
73}
74
75pub struct Coroutine<'a>(&'a Strand<'a, Option<Context>>);
77
78impl<'a> Coroutine<'a> {
79 pub fn wrap<R, E>(&self) -> CoroutineHandler<R, E>
98 where R: Send + 'static,
99 E: Send + 'static,
100 {
101 CoroutineHandler(self.0.wrap(coro_sender))
102 }
103}
104
105unsafe impl<'a> AsIoContext for Coroutine<'a> {
106 fn as_ctx(&self) -> &IoContext {
107 self.0.as_ctx()
108 }
109}
110
111pub struct CoroutineReceiver<R>(StrandData<Option<Context>>, PhantomData<R>);
112
113impl<R: Send + 'static> Receiver<R> for CoroutineReceiver<R> {
114 fn recv(self, ctx: &IoContext) -> R {
115 coro_receiver(strand_clone(ctx, &self.0))
116 }
117}
118
119pub struct CoroutineHandler<R, E>(
120 StrandHandler<Option<Context>, fn(Strand<Option<Context>>, Result<R, E>), R, E>
121);
122
123impl<R, E> CoroutineHandler<R, E>
124 where R: Send + 'static,
125 E: Send + 'static,
126{
127 fn send(self, ctx: &IoContext, res: Result<R, E>) {
128 self.0.send(ctx, res)
129 }
130}
131
132impl<R, E> Handler<R, E> for CoroutineHandler<R, E>
133 where R: Send + 'static,
134 E: Send + 'static,
135{
136 type Output = Result<R, E>;
137
138 type Receiver = CoroutineReceiver<Self::Output>;
139
140 fn channel<G>(self, op: G) -> (Operation<R, E, G>, Self::Receiver)
141 where G: WrappedHandler<R, E> + Send + 'static
142 {
143 let data = self.0.data.clone();
144 (Box::new((self, op)), CoroutineReceiver(data, PhantomData))
145 }
146
147 fn result(self, _ctx: &IoContext, res: Result<R, E>) -> Self::Output {
148 res
149 }
150}
151
152impl<R, E, G> FnOp for (CoroutineHandler<R, E>, G)
153 where R: Send + 'static,
154 E: Send + 'static,
155 G: WrappedHandler<R, E> + Send + 'static,
156{
157 fn call_op(self: Box<Self>, ctx: &IoContext, this: &mut ThreadIoContext, ec: ErrCode) {
158 (self.0).0.data.clone().run(ctx, this, move|st: Strand<Option<Context>>, this: &mut ThreadIoContext| {
159 let mut g = UnsafeRefCell::new(&self.1);
160 unsafe { g.as_mut() }.perform(st.as_ctx(), this, ec, self)
161 })
162 }
163}
164
165impl<R, E, G> Upcast<FnOp + Send> for (CoroutineHandler<R, E>, G)
166 where R: Send + 'static,
167 E: Send + 'static,
168 G: WrappedHandler<R, E> + Send + 'static,
169{
170 fn upcast(self: Box<Self>) -> Box<FnOp + Send> {
171 self
172 }
173}
174
175impl<R, E, G> Sender<R, E, G> for (CoroutineHandler<R, E>, G)
176 where R: Send + 'static,
177 E: Send + 'static,
178 G: WrappedHandler<R, E> + Send + 'static,
179{
180 fn send(self: Box<Self>, ctx: &IoContext, res: Result<R, E>) {
181 self.0.send(ctx, res)
182 }
183
184 fn as_self(&self) -> &G {
185 &self.1
186 }
187
188 fn as_mut_self(&mut self) -> &mut G {
189 &mut self.1
190 }
191}
192
193impl IoContext {
194 pub fn spawn<F>(ctx: &IoContext, func: F)
195 where F: FnOnce(Coroutine) + 'static
196 {
197 let data = InitData {
198 stack: ProtectedFixedSizeStack::default(),
199 ctx: ctx.clone(),
200 func: Box::new(func),
201 };
202
203 let context = Context::new(&data.stack, coro_entry);
204 let mut data_opt = Some(data);
205 let Transfer { context, data } = context.resume(&mut data_opt as *mut _ as usize);
206 let coro = unsafe { &*(data as *const StrandImmutable<Option<Context>>) };
207 *unsafe { coro.as_mut() } = Some(context);
208
209 coro.post(move |mut coro| {
210 let Transfer { context, data } = coro.take().unwrap().resume(0);
211 if data == 0 {
212 *coro = Some(context)
213 }
214 })
215 }
216}
217
218#[test]
219fn test_spawn_0() {
220 let ctx = &IoContext::new().unwrap();
221 IoContext::spawn(ctx, |_| {});
222 ctx.run();
223}
224
225#[test]
226fn test_spawn_1() {
227 use ip::{IpProtocol, Udp, UdpSocket};
228
229 let ctx = &IoContext::new().unwrap();
230 IoContext::spawn(ctx, |coro| {
231 let ctx = coro.as_ctx();
232 let udp = UdpSocket::new(ctx, Udp::v4()).unwrap();
233 let buf = [0; 256];
234 assert!(udp.async_send(&buf, 0, coro.wrap()).is_err());
235 assert!(udp.async_send(&buf, 0, coro.wrap()).is_err());
236 });
237 ctx.run();
238}