maniac_runtime/generator/
gen_impl.rs

1//! # generator
2//!
3//! Rust generator implementation
4//!
5
6use crate::generator::detail::gen_init;
7use crate::generator::reg_context::RegContext;
8use crate::generator::rt::{Context, ContextStack, Error};
9use crate::generator::scope::Scope;
10use crate::generator::stack::{Func, Stack, StackBox};
11
12use std::any::Any;
13use std::fmt;
14use std::marker::PhantomData;
15use std::panic;
16use std::thread;
17
18/// The default stack size for generators, in bytes.
19// windows has a minimal size as 0x4a8!!!!
20pub const DEFAULT_STACK_SIZE: usize = 1024*1024*2;
21
22#[inline]
23#[cold]
24fn cold() {}
25
26// #[inline]
27// fn likely(b: bool) -> bool {
28//     if !b { cold() }
29//     b
30// }
31
32#[inline]
33pub(crate) fn unlikely(b: bool) -> bool {
34    if b {
35        cold()
36    }
37    b
38}
39
40/// the generator obj type, the functor passed to it must be Send
41pub struct GeneratorObj<'a, A, T, const LOCAL: bool> {
42    inner: StackBox<GeneratorImpl<'a, A, T>>,
43}
44
45/// the generator type, the functor passed to it must be Send
46pub type Generator<'a, A, T> = GeneratorObj<'a, A, T, false>;
47
48// only when A, T and Functor are all sendable, the generator could be send
49unsafe impl<A: Send, T: Send> Send for Generator<'static, A, T> {}
50
51impl<'a, A, T> Generator<'a, A, T> {
52    /// init a heap based generator with scoped closure
53    pub fn scoped_init<F>(&mut self, f: F)
54    where
55        for<'scope> F: FnOnce(Scope<'scope, 'a, A, T>) -> T + Send + 'a,
56        T: Send + 'a,
57        A: Send + 'a,
58    {
59        self.inner.scoped_init(f);
60    }
61
62    /// init a heap based generator
63    // it's can be used to re-init a 'done' generator before it's get dropped
64    pub fn init_code<F: FnOnce() -> T + Send + 'a>(&mut self, f: F)
65    where
66        T: Send + 'a,
67    {
68        self.inner.init_code(f);
69    }
70}
71
72/// the local generator type, can't Send
73pub type LocalGenerator<'a, A, T> = GeneratorObj<'a, A, T, true>;
74
75impl<'a, A, T> LocalGenerator<'a, A, T> {
76    /// init a heap based generator with scoped closure
77    pub fn scoped_init<F>(&mut self, f: F)
78    where
79        for<'scope> F: FnOnce(Scope<'scope, 'a, A, T>) -> T + 'a,
80        T: 'a,
81        A: 'a,
82    {
83        self.inner.scoped_init(f);
84    }
85}
86
87impl<'a, A, T, const LOCAL: bool> GeneratorObj<'a, A, T, LOCAL> {
88    /// Constructs a Generator from a raw pointer.
89    ///
90    /// # Safety
91    ///
92    /// This function is unsafe because improper use may lead to
93    /// memory problems. For example, a double-free may occur if the
94    /// function is called twice on the same raw pointer.
95    #[inline]
96    pub unsafe fn from_raw(raw: *mut usize) -> Self {
97        GeneratorObj {
98            inner: unsafe { StackBox::from_raw(raw as *mut GeneratorImpl<'a, A, T>) },
99        }
100    }
101
102    /// Consumes the `Generator`, returning a wrapped raw pointer.
103    #[inline]
104    pub fn into_raw(self) -> *mut usize {
105        let ret = self.inner.as_ptr() as *mut usize;
106        std::mem::forget(self);
107        ret
108    }
109
110    /// prefetch the generator into cache
111    #[inline]
112    pub fn prefetch(&self) {
113        self.inner.prefetch();
114    }
115
116    /// prepare the para that passed into generator before send
117    #[inline]
118    pub fn set_para(&mut self, para: A) {
119        self.inner.set_para(para);
120    }
121
122    /// set the generator local data
123    #[inline]
124    pub fn set_local_data(&mut self, data: *mut u8) {
125        self.inner.set_local_data(data);
126    }
127
128    /// get the generator local data
129    #[inline]
130    pub fn get_local_data(&self) -> *mut u8 {
131        self.inner.get_local_data()
132    }
133
134    /// get the generator panic data
135    #[inline]
136    pub fn get_panic_data(&mut self) -> Option<Box<dyn Any + Send>> {
137        self.inner.get_panic_data()
138    }
139
140    /// resume the generator without touch the para
141    /// you should call `set_para` before this method
142    #[inline]
143    pub fn resume(&mut self) -> Option<T> {
144        self.inner.resume()
145    }
146
147    /// `raw_send`
148    #[inline]
149    pub fn raw_send(&mut self, para: Option<A>) -> Option<T> {
150        self.inner.raw_send(para)
151    }
152
153    /// send interface
154    pub fn send(&mut self, para: A) -> T {
155        self.inner.send(para)
156    }
157
158    /// cancel the generator
159    /// this will trigger a Cancel panic to unwind the stack and finish the generator
160    pub fn cancel(&mut self) {
161        self.inner.cancel()
162    }
163
164    /// is finished
165    #[inline]
166    pub fn is_done(&self) -> bool {
167        self.inner.is_done()
168    }
169
170    /// get stack total size and used size in word
171    pub fn stack_usage(&self) -> (usize, usize) {
172        self.inner.stack_usage()
173    }
174}
175
176impl<T, const LOCAL: bool> Iterator for GeneratorObj<'_, (), T, LOCAL> {
177    type Item = T;
178    fn next(&mut self) -> Option<T> {
179        self.resume()
180    }
181}
182
183impl<A, T, const LOCAL: bool> fmt::Debug for GeneratorObj<'_, A, T, LOCAL> {
184    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
185        write!(
186            f,
187            "Generator<{}, Output={}, Local={}> {{ ... }}",
188            std::any::type_name::<A>(),
189            std::any::type_name::<T>(),
190            LOCAL
191        )
192    }
193}
194
195/// Generator helper
196pub struct Gn<A = ()> {
197    dummy: PhantomData<A>,
198}
199
200impl<A> Gn<A> {
201    /// create a scoped generator with default stack size
202    pub fn new_scoped<'a, T, F>(f: F) -> Generator<'a, A, T>
203    where
204        for<'scope> F: FnOnce(Scope<'scope, 'a, A, T>) -> T + Send + 'a,
205        T: Send + 'a,
206        A: Send + 'a,
207    {
208        Self::new_scoped_opt(DEFAULT_STACK_SIZE, f)
209    }
210
211    /// create a scoped local generator with default stack size
212    pub fn new_scoped_local<'a, T, F>(f: F) -> LocalGenerator<'a, A, T>
213    where
214        F: FnOnce(Scope<A, T>) -> T + 'a,
215        T: 'a,
216        A: 'a,
217    {
218        Self::new_scoped_opt_local(DEFAULT_STACK_SIZE, f)
219    }
220
221    /// create a scoped generator with specified stack size
222    pub fn new_scoped_opt<'a, T, F>(size: usize, f: F) -> Generator<'a, A, T>
223    where
224        for<'scope> F: FnOnce(Scope<'scope, 'a, A, T>) -> T + Send + 'a,
225        T: Send + 'a,
226        A: Send + 'a,
227    {
228        let mut inner_box = GeneratorImpl::<A, T>::new(Stack::new(size));
229        inner_box.scoped_init(f);
230        Generator { inner: inner_box }
231    }
232
233    /// create a scoped local generator with specified stack size
234    pub fn new_scoped_opt_local<'a, T, F>(size: usize, f: F) -> LocalGenerator<'a, A, T>
235    where
236        F: FnOnce(Scope<A, T>) -> T + 'a,
237        T: 'a,
238        A: 'a,
239    {
240        let mut inner_box = GeneratorImpl::<A, T>::new(Stack::new(size));
241        inner_box.scoped_init(f);
242        LocalGenerator { inner: inner_box }
243    }
244}
245
246impl<A: Any> Gn<A> {
247    /// create a new generator with default stack size
248    #[allow(clippy::new_ret_no_self)]
249    #[deprecated(since = "0.6.18", note = "please use `scope` version instead")]
250    pub fn new<'a, T: Any, F>(f: F) -> Generator<'a, A, T>
251    where
252        F: FnOnce() -> T + Send + 'a,
253    {
254        Self::new_opt(DEFAULT_STACK_SIZE, f)
255    }
256
257    /// create a new generator with specified stack size
258    // the `may` library use this API so we can't deprecated it yet.
259    pub fn new_opt<'a, T: Any, F>(size: usize, f: F) -> Generator<'a, A, T>
260    where
261        F: FnOnce() -> T + Send + 'a,
262    {
263        let mut inner_box = GeneratorImpl::<A, T>::new(Stack::new(size));
264        inner_box.init_context();
265        inner_box.init_code(f);
266        Generator { inner: inner_box }
267    }
268}
269
270/// `GeneratorImpl`
271#[repr(C)]
272struct GeneratorImpl<'a, A, T> {
273    // run time context
274    context: Context,
275    // stack
276    stack: Stack,
277    // save the input
278    para: Option<A>,
279    // save the output
280    ret: Option<T>,
281    // boxed functor
282    f: Option<Func>,
283    // phantom lifetime
284    phantom: PhantomData<&'a T>,
285}
286
287impl<A: Any, T: Any> GeneratorImpl<'_, A, T> {
288    /// create a new generator with default stack size
289    fn init_context(&mut self) {
290        unsafe {
291            std::ptr::write(
292                self.context.para.as_mut_ptr(),
293                &mut self.para as &mut dyn Any,
294            );
295            std::ptr::write(self.context.ret.as_mut_ptr(), &mut self.ret as &mut dyn Any);
296        }
297    }
298}
299
300impl<'a, A, T> GeneratorImpl<'a, A, T> {
301    /// create a new generator with specified stack size
302    fn new(mut stack: Stack) -> StackBox<Self> {
303        // the stack box would finally dealloc the stack!
304        unsafe {
305            let mut stack_box = stack.alloc_uninit_box::<GeneratorImpl<'a, A, T>>();
306            (*stack_box.as_mut_ptr()).init(GeneratorImpl {
307                para: None,
308                stack,
309                ret: None,
310                f: None,
311                context: Context::new(),
312                phantom: PhantomData,
313            });
314            stack_box.assume_init()
315        }
316    }
317
318    /// prefetch the generator into cache
319    #[inline]
320    pub fn prefetch(&self) {
321        self.context.regs.prefetch();
322    }
323
324    /// init a heap based generator with scoped closure
325    fn scoped_init<F>(&mut self, f: F)
326    where
327        for<'scope> F: FnOnce(Scope<'scope, 'a, A, T>) -> T + 'a,
328        T: 'a,
329        A: 'a,
330    {
331        use std::mem::transmute;
332        let scope: Scope<A, T> = unsafe { transmute(Scope::new(&mut self.para, &mut self.ret)) };
333        self.init_code(move || f(scope));
334    }
335
336    /// init a heap based generator
337    // it's can be used to re-init a 'done' generator before it's get dropped
338    fn init_code<F: FnOnce() -> T + 'a>(&mut self, f: F)
339    where
340        T: 'a,
341    {
342        // make sure the last one is finished
343        if self.f.is_none() && self.context._ref == 0 {
344            self.cancel();
345        } else {
346            let _ = self.f.take();
347        }
348
349        // init ctx parent to itself, this would be the new top
350        self.context.parent = &mut self.context;
351
352        // init the ref to 0 means that it's ready to start
353        self.context._ref = 0;
354        let ret = &mut self.ret as *mut _;
355        // alloc the function on stack
356        let func = StackBox::new_fn_once(&mut self.stack, move || {
357            let r = f();
358            unsafe { *ret = Some(r) };
359        });
360
361        self.f = Some(func);
362
363        let guard = (self.stack.begin() as usize, self.stack.end() as usize);
364        self.context.stack_guard = guard;
365        self.context.regs.init_with(
366            gen_init,
367            0,
368            &mut self.f as *mut _ as *mut usize,
369            &self.stack,
370        );
371    }
372
373    /// resume the generator
374    #[inline]
375    fn resume_gen(&mut self) {
376        let env = ContextStack::current();
377        // get the current regs
378        let cur = &mut env.top().regs;
379
380        // switch to new context, always use the top context's reg
381        // for normal generator self.context.parent == self.context
382        // for coroutine self.context.parent == top generator context
383        debug_assert!(!self.context.parent.is_null());
384        let top = unsafe { &mut *self.context.parent };
385
386        // save current generator context on stack
387        env.push_context(&mut self.context);
388
389        // swap to the generator
390        RegContext::swap(cur, &top.regs);
391
392        // comes back, check the panic status
393        // this would propagate the panic until root context
394        // if it's a coroutine just stop propagate
395        if !self.context.local_data.is_null() {
396            return;
397        }
398
399        if let Some(err) = self.context.err.take() {
400            // pass the error to the parent until root
401            panic::resume_unwind(err);
402        }
403    }
404
405    #[inline]
406    fn is_started(&self) -> bool {
407        // when the f is consumed we think it's running
408        self.f.is_none()
409    }
410
411    /// prepare the para that passed into generator before send
412    #[inline]
413    fn set_para(&mut self, para: A) {
414        self.para = Some(para);
415    }
416
417    /// set the generator local data
418    #[inline]
419    fn set_local_data(&mut self, data: *mut u8) {
420        self.context.local_data = data;
421    }
422
423    /// get the generator local data
424    #[inline]
425    fn get_local_data(&self) -> *mut u8 {
426        self.context.local_data
427    }
428
429    /// get the generator panic data
430    #[inline]
431    fn get_panic_data(&mut self) -> Option<Box<dyn Any + Send>> {
432        self.context.err.take()
433    }
434
435    /// resume the generator without touch the para
436    /// you should call `set_para` before this method
437    #[inline]
438    fn resume(&mut self) -> Option<T> {
439        if unlikely(self.is_done()) {
440            return None;
441        }
442
443        // every time we call the function, increase the ref count
444        // yield will decrease it and return will not
445        self.context._ref += 1;
446        self.resume_gen();
447
448        self.ret.take()
449    }
450
451    /// `raw_send`
452    #[inline]
453    fn raw_send(&mut self, para: Option<A>) -> Option<T> {
454        if unlikely(self.is_done()) {
455            return None;
456        }
457
458        // this is the passed in value of the send primitive
459        // the yield part would read out this value in the next round
460        self.para = para;
461
462        // every time we call the function, increase the ref count
463        // yield will decrease it and return will not
464        self.context._ref += 1;
465        self.resume_gen();
466
467        self.ret.take()
468    }
469
470    /// send interface
471    fn send(&mut self, para: A) -> T {
472        let ret = self.raw_send(Some(para));
473        ret.expect("send got None return")
474    }
475
476    /// cancel the generator without any check
477    #[inline]
478    fn raw_cancel(&mut self) {
479        // tell the func to panic
480        // so that we can stop the inner func
481        self.context._ref = 2;
482        // save the old panic hook, we don't want to print anything for the Cancel
483        let old = panic::take_hook();
484        panic::set_hook(Box::new(|_| {}));
485        self.resume_gen();
486        panic::set_hook(old);
487    }
488
489    /// cancel the generator
490    /// this will trigger a Cancel panic to unwind the stack
491    fn cancel(&mut self) {
492        if self.is_done() {
493            return;
494        }
495
496        // consume the fun if it's not started
497        if !self.is_started() {
498            self.f.take();
499            self.context._ref = 1;
500        } else {
501            self.raw_cancel();
502        }
503    }
504
505    /// is finished
506    #[inline]
507    fn is_done(&self) -> bool {
508        self.is_started() && (self.context._ref & 0x3) != 0
509    }
510
511    /// get stack total size and used size in word
512    fn stack_usage(&self) -> (usize, usize) {
513        (self.stack.size(), self.stack.get_used_size())
514    }
515}
516
517impl<A, T> Drop for GeneratorImpl<'_, A, T> {
518    fn drop(&mut self) {
519        // when the thread is already panic, do nothing
520        if thread::panicking() {
521            return;
522        }
523
524        if !self.is_started() {
525            // not started yet, just drop the gen
526            return;
527        }
528
529        if !self.is_done() {
530            log::trace!("generator is not done while drop");
531            self.raw_cancel()
532        }
533
534        assert!(self.is_done());
535
536        let (total_stack, used_stack) = self.stack_usage();
537        if used_stack < total_stack {
538            // here we should record the stack in the class
539            // next time will just use
540            // set_stack_size::<F>(used_stack);
541        } else {
542            log::error!("stack overflow detected!");
543            panic::panic_any(Error::StackErr);
544        }
545    }
546}