asyncio/async/
coroutine.rs

1use unsafe_cell::UnsafeRefCell;
2use error::ErrCode;
3use core::{IoContext, AsIoContext, ThreadIoContext, FnOp, Upcast};
4use async::{Handler, WrappedHandler, Receiver, Sender, Operation};
5use async::strand::{StrandData, Strand, StrandImmutable, StrandHandler, strand_clone};
6
7use std::marker::PhantomData;
8use context::{Context, Transfer};
9use context::stack::ProtectedFixedSizeStack;
10
11trait FnBox {
12    fn call_box(self: Box<Self>, Coroutine);
13}
14
15impl<F: FnOnce(Coroutine)> FnBox for F {
16    fn call_box(self: Box<Self>, co: Coroutine) {
17        (*self)(co)
18    }
19}
20
21struct InitData {
22    stack: ProtectedFixedSizeStack,
23    ctx: IoContext,
24    func: Box<FnBox>,
25}
26
27extern "C" fn coro_entry(t: Transfer) -> ! {
28    let InitData { stack, ctx, func } = unsafe {
29        let data_opt_ref = &mut *(t.data as *mut Option<InitData>);
30        data_opt_ref.take().unwrap()
31    };
32
33    let context = {
34        let ctx = ctx;
35        let coro = IoContext::strand(&ctx, Some(t.context));
36        let mut data = unsafe { coro.as_mut() };
37        let Transfer { context, data:_ } = data.take().unwrap().resume(&coro as *const _ as usize);
38        *data = Some(context);
39
40        func.call_box(Coroutine(&data));
41        data.take().unwrap()
42    };
43
44    let mut stack_opt = Some(stack);
45    context.resume_ontop(&mut stack_opt as *mut _ as usize, coro_exit);
46
47    unreachable!();
48}
49
50extern "C" fn coro_exit(t: Transfer) -> Transfer {
51    unsafe {
52        // Drop the stack
53        let stack_opt_ref = &mut *(t.data as *mut Option<ProtectedFixedSizeStack>);
54        let _ = stack_opt_ref.take().unwrap();
55    }
56    t
57}
58
59fn coro_receiver<R: Send + 'static>(mut coro: Strand<Option<Context>>) -> R {
60    let Transfer { context, data } = coro.take().unwrap().resume(0);
61    *coro = Some(context);
62
63    let data_opt = unsafe { &mut *(data as *mut Option<R>) };
64    data_opt.take().unwrap()
65}
66
67fn coro_sender<R: Send + 'static>(mut coro: Strand<Option<Context>>, data: R) {
68    let mut data_opt = Some(data);
69    let Transfer { context, data } = coro.take().unwrap().resume(&mut data_opt as *mut _ as usize);
70    if data == 0 {
71        *coro = Some(context);
72    }
73}
74
75/// Context object that represents the currently executing coroutine.
76pub struct Coroutine<'a>(&'a Strand<'a, Option<Context>>);
77
78impl<'a> Coroutine<'a> {
79    /// Provides a `Coroutine` handler to asynchronous operation.
80    ///
81    /// The CoroutineHandler has trait the `Handler`, that type of `Handler::Output` is `io::Result<R>`.
82    ///
83    /// # Examples
84    ///
85    /// ```
86    /// use asyncio::{IoContext, AsIoContext, Stream};
87    /// use asyncio::ip::{IpProtocol, Tcp, TcpSocket};
88    ///
89    /// let ctx = &IoContext::new().unwrap();
90    /// IoContext::spawn(ctx, |coro| {
91    ///   let ctx = coro.as_ctx();
92    ///   let mut soc = TcpSocket::new(ctx, Tcp::v4()).unwrap();
93    ///   let mut buf = [0; 256];
94    ///   let size = soc.async_read_some(&mut buf, coro.wrap()).unwrap();
95    /// });
96    /// ```
97    pub fn wrap<R, E>(&self) -> CoroutineHandler<R, E>
98        where R: Send + 'static,
99              E: Send + 'static,
100    {
101        CoroutineHandler(self.0.wrap(coro_sender))
102    }
103}
104
105unsafe impl<'a> AsIoContext for Coroutine<'a> {
106    fn as_ctx(&self) -> &IoContext {
107        self.0.as_ctx()
108    }
109}
110
111pub struct CoroutineReceiver<R>(StrandData<Option<Context>>, PhantomData<R>);
112
113impl<R: Send + 'static> Receiver<R> for CoroutineReceiver<R> {
114    fn recv(self, ctx: &IoContext) -> R {
115        coro_receiver(strand_clone(ctx, &self.0))
116    }
117}
118
119pub struct CoroutineHandler<R, E>(
120    StrandHandler<Option<Context>, fn(Strand<Option<Context>>, Result<R, E>), R, E>
121);
122
123impl<R, E> CoroutineHandler<R, E>
124    where R: Send + 'static,
125          E: Send + 'static,
126{
127    fn send(self, ctx: &IoContext, res: Result<R, E>) {
128        self.0.send(ctx, res)
129    }
130}
131
132impl<R, E> Handler<R, E> for CoroutineHandler<R, E>
133    where R: Send + 'static,
134          E: Send + 'static,
135{
136    type Output = Result<R, E>;
137
138    type Receiver = CoroutineReceiver<Self::Output>;
139
140    fn channel<G>(self, op: G) -> (Operation<R, E, G>, Self::Receiver)
141        where G: WrappedHandler<R, E> + Send + 'static
142    {
143        let data = self.0.data.clone();
144        (Box::new((self, op)), CoroutineReceiver(data, PhantomData))
145    }
146
147    fn result(self, _ctx: &IoContext, res: Result<R, E>) -> Self::Output {
148        res
149    }
150}
151
152impl<R, E, G> FnOp for (CoroutineHandler<R, E>, G)
153    where R: Send + 'static,
154          E: Send + 'static,
155          G: WrappedHandler<R, E> + Send + 'static,
156{
157    fn call_op(self: Box<Self>, ctx: &IoContext, this: &mut ThreadIoContext, ec: ErrCode) {
158        (self.0).0.data.clone().run(ctx, this, move|st: Strand<Option<Context>>, this: &mut ThreadIoContext| {
159            let mut g = UnsafeRefCell::new(&self.1);
160            unsafe { g.as_mut() }.perform(st.as_ctx(), this, ec, self)
161        })
162    }
163}
164
165impl<R, E, G> Upcast<FnOp + Send> for (CoroutineHandler<R, E>, G)
166    where R: Send + 'static,
167          E: Send + 'static,
168          G: WrappedHandler<R, E> + Send + 'static,
169{
170    fn upcast(self: Box<Self>) -> Box<FnOp + Send> {
171        self
172    }
173}
174
175impl<R, E, G> Sender<R, E, G> for (CoroutineHandler<R, E>, G)
176    where R: Send + 'static,
177          E: Send + 'static,
178          G: WrappedHandler<R, E> + Send + 'static,
179{
180    fn send(self: Box<Self>, ctx: &IoContext, res: Result<R, E>) {
181        self.0.send(ctx, res)
182    }
183
184    fn as_self(&self) -> &G {
185        &self.1
186    }
187
188    fn as_mut_self(&mut self) -> &mut G {
189        &mut self.1
190    }
191}
192
193impl IoContext {
194    pub fn spawn<F>(ctx: &IoContext, func: F)
195        where F: FnOnce(Coroutine) + 'static
196    {
197        let data = InitData {
198            stack: ProtectedFixedSizeStack::default(),
199            ctx: ctx.clone(),
200            func: Box::new(func),
201        };
202
203        let context = Context::new(&data.stack, coro_entry);
204        let mut data_opt = Some(data);
205        let Transfer { context, data } = context.resume(&mut data_opt as *mut _ as usize);
206        let coro = unsafe { &*(data as *const StrandImmutable<Option<Context>>) };
207        *unsafe { coro.as_mut() } = Some(context);
208
209        coro.post(move |mut coro| {
210            let Transfer { context, data } = coro.take().unwrap().resume(0);
211            if data == 0 {
212                *coro = Some(context)
213            }
214        })
215    }
216}
217
218#[test]
219fn test_spawn_0() {
220    let ctx = &IoContext::new().unwrap();
221    IoContext::spawn(ctx, |_| {});
222    ctx.run();
223}
224
225#[test]
226fn test_spawn_1() {
227    use ip::{IpProtocol, Udp, UdpSocket};
228
229    let ctx = &IoContext::new().unwrap();
230    IoContext::spawn(ctx, |coro| {
231        let ctx = coro.as_ctx();
232        let udp = UdpSocket::new(ctx, Udp::v4()).unwrap();
233        let buf = [0; 256];
234        assert!(udp.async_send(&buf, 0, coro.wrap()).is_err());
235        assert!(udp.async_send(&buf, 0, coro.wrap()).is_err());
236    });
237    ctx.run();
238}