Function may::config

source ·
pub fn config() -> Config
Expand description

get the may configuration instance

Examples found in repository?
src/pool.rs (line 18)
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
    fn create_dummy_coroutine() -> CoroutineImpl {
        Gn::new_opt(config().get_stack_size(), move || {
            unreachable!("dummy coroutine should never be called");
        })
    }

    pub fn new() -> Self {
        let capacity = config().get_pool_capacity();
        let pool = SegQueue::new();
        for _ in 0..capacity {
            let co = Self::create_dummy_coroutine();
            pool.push(co);
        }
        let size = AtomicUsize::new(capacity);

        CoroutinePool { pool, size }
    }

    /// get a raw coroutine from the pool
    #[inline]
    pub fn get(&self) -> CoroutineImpl {
        self.size.fetch_sub(1, Ordering::AcqRel);
        match self.pool.pop() {
            Some(co) => co,
            None => {
                self.size.fetch_add(1, Ordering::AcqRel);
                Self::create_dummy_coroutine()
            }
        }
    }

    /// put a raw coroutine into the pool
    #[inline]
    pub fn put(&self, co: CoroutineImpl) {
        // discard the co if push failed
        let m = self.size.fetch_add(1, Ordering::AcqRel);
        if m >= config().get_pool_capacity() {
            self.size.fetch_sub(1, Ordering::AcqRel);
            return;
        }
        self.pool.push(co);
    }
More examples
Hide additional examples
src/coroutine_impl.rs (line 79)
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
    fn drop_coroutine(co: CoroutineImpl) {
        // assert!(co.is_done(), "unfinished coroutine detected");
        // just consume the coroutine
        // destroy the local storage
        let local = unsafe { Box::from_raw(get_co_local(&co)) };
        let name = local.get_co().name();

        // recycle the coroutine
        let (size, used) = co.stack_usage();
        if used == size {
            eprintln!("stack overflow detected, size={}", size);
            ::std::process::exit(1);
        }
        // show the actual used stack size in debug log
        if local.get_co().stack_size() & 1 == 1 {
            println!(
                "coroutine name = {:?}, stack size = {},  used size = {}",
                name, size, used
            );
        }

        if size == config().get_stack_size() {
            get_scheduler().pool.put(co);
        }
    }
}

impl EventSource for Done {
    fn subscribe(&mut self, co: CoroutineImpl) {
        Self::drop_coroutine(co);
    }
}

/// coroutines are static generator
/// the para type is EventResult, the result type is EventSubscriber
pub type CoroutineImpl = Generator<'static, EventResult, EventSubscriber>;

#[inline]
#[allow(clippy::cast_ptr_alignment)]
fn get_co_local(co: &CoroutineImpl) -> *mut CoroutineLocal {
    co.get_local_data() as *mut CoroutineLocal
}

/// /////////////////////////////////////////////////////////////////////////////
/// Coroutine
/// /////////////////////////////////////////////////////////////////////////////

/// The internal representation of a `Coroutine` handle
struct Inner {
    name: Option<String>,
    stack_size: usize,
    park: Park,
    cancel: Cancel,
}

#[derive(Clone)]
/// A handle to a coroutine.
pub struct Coroutine {
    inner: Arc<Inner>,
}

impl Coroutine {
    // Used only internally to construct a coroutine object without spawning
    fn new(name: Option<String>, stack_size: usize) -> Coroutine {
        Coroutine {
            inner: Arc::new(Inner {
                name,
                stack_size,
                park: Park::new(),
                cancel: Cancel::new(),
            }),
        }
    }

    /// Gets the coroutine stack size.
    pub fn stack_size(&self) -> usize {
        self.inner.stack_size
    }

    /// Atomically makes the handle's token available if it is not already.
    pub fn unpark(&self) {
        self.inner.park.unpark();
    }

    /// cancel a coroutine
    /// # Safety
    ///
    /// This function would force a coroutine exist when next scheduling
    /// And would drop all the resource tha the coroutine currently holding
    /// This may have unexpected side effects if you are not fully aware it
    pub unsafe fn cancel(&self) {
        self.inner.cancel.cancel();
    }

    /// Gets the coroutine name.
    pub fn name(&self) -> Option<&str> {
        self.inner.name.as_deref()
    }

    /// Get the internal cancel
    #[cfg(unix)]
    #[cfg(feature = "io_cancel")]
    pub(crate) fn get_cancel(&self) -> &Cancel {
        &self.inner.cancel
    }
}

impl fmt::Debug for Coroutine {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        fmt::Debug::fmt(&self.name(), f)
    }
}

////////////////////////////////////////////////////////////////////////////////
// Builder
////////////////////////////////////////////////////////////////////////////////

/// Coroutine factory, which can be used in order to configure the properties of
/// a new coroutine.
///
/// Methods can be chained on it in order to configure it.
///
/// The two configurations available are:
///
/// - [`name`]: specifies an [associated name for the coroutine][naming-coroutines]
/// - [`stack_size`]: specifies the [desired stack size for the coroutine][stack-size]
///
/// The [`spawn`] method will take ownership of the builder and create an
/// `io::Result` to the coroutine handle with the given configuration.
///
/// The [`coroutine::spawn`] free function uses a `Builder` with default
/// configuration and `unwrap`s its return value.
///
/// You may want to use [`spawn`] instead of [`coroutine::spawn`], when you want
/// to recover from a failure to launch a coroutine, indeed the free function will
/// panics where the `Builder` method will return a `io::Result`.
///
/// # Examples
///
/// ```
/// use may::coroutine;
///
/// let builder = coroutine::Builder::new();
/// let code = || {
///     // coroutine code
/// };
///
/// let handler = unsafe { builder.spawn(code).unwrap() };
///
/// handler.join().unwrap();
/// ```
///
/// [`coroutine::spawn`]: ./fn.spawn.html
/// [`stack_size`]: ./struct.Builder.html#method.stack_size
/// [`name`]: ./struct.Builder.html#method.name
/// [`spawn`]: ./struct.Builder.html#method.spawn
/// [naming-coroutines]: ./index.html#naming-coroutine
/// [stack-size]: ./index.html#stack-siz
#[derive(Default)]
pub struct Builder {
    // A name for the coroutine-to-be, for identification in panic messages
    name: Option<String>,
    // The size of the stack for the spawned coroutine
    stack_size: Option<usize>,
}

impl Builder {
    /// Generates the base configuration for spawning a coroutine, from which
    /// configuration methods can be chained.
    pub fn new() -> Builder {
        Builder {
            name: None,
            stack_size: None,
        }
    }

    /// Names the thread-to-be. Currently the name is used for identification
    /// only in panic messages.
    pub fn name(mut self, name: String) -> Builder {
        self.name = Some(name);
        self
    }

    /// Sets the size of the stack for the new coroutine.
    pub fn stack_size(mut self, size: usize) -> Builder {
        self.stack_size = Some(size);
        self
    }

    /// Spawns a new coroutine, and returns a join handle for it.
    /// The join handle can be used to block on
    /// termination of the child coroutine, including recovering its panics.
    fn spawn_impl<F, T>(self, f: F) -> io::Result<(CoroutineImpl, JoinHandle<T>)>
    where
        F: FnOnce() -> T + Send + 'static,
        T: Send + 'static,
    {
        static DONE: Done = Done {};

        let sched = get_scheduler();
        let Builder { name, stack_size } = self;
        let stack_size = stack_size.unwrap_or_else(|| config().get_stack_size());

        // create a join resource, shared by waited coroutine and *this* coroutine
        let panic = Arc::new(AtomicCell::new(None));
        let join = Arc::new(Join::new(panic.clone()));
        let packet = Arc::new(AtomicCell::new(None));
        let their_join = join.clone();
        let their_packet = packet.clone();

        let subscriber = EventSubscriber {
            resource: &DONE as &dyn EventSource as *const _ as *mut dyn EventSource,
        };

        let closure = move || {
            // trigger the JoinHandler
            // we must declare the variable before calling f so that stack is prepared
            // to unwind these local data. for the panic err we would set it in the
            // coroutine local data so that can return from the packet variable

            // set the return packet
            their_packet.swap(Some(f()));

            their_join.trigger();
            subscriber
        };

        let mut co = if stack_size == config().get_stack_size() {
            let mut co = sched.pool.get();
            co.init_code(closure);
            co
        } else {
            Gn::new_opt(stack_size, closure)
        };

        let handle = Coroutine::new(name, stack_size);
        // create the local storage
        let local = CoroutineLocal::new(handle.clone(), join.clone());
        // attache the local storage to the coroutine
        co.set_local_data(Box::into_raw(local) as *mut u8);

        Ok((co, make_join_handle(handle, join, packet, panic)))
    }
src/scheduler.rs (line 38)
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
fn init_scheduler() {
    let workers = config().get_workers();
    let b: Box<Scheduler> = Scheduler::new(workers);
    unsafe { SCHED = Box::into_raw(b) };

    // timer thread
    thread::spawn(move || {
        // timer function
        let timer_event_handler = |c: Arc<AtomicOption<CoroutineImpl>>| {
            // just re-push the co to the visit list
            if let Some(mut co) = c.take(Ordering::Relaxed) {
                // set the timeout result for the coroutine
                set_co_para(&mut co, io::Error::new(io::ErrorKind::TimedOut, "timeout"));
                // s.schedule_global(c);
                run_coroutine(co);
            }
        };

        let s = unsafe { &*SCHED };
        s.timer_thread.run(&timer_event_handler);
    });

    // io event loop thread
    for id in 0..workers {
        thread::spawn(move || {
            let s = unsafe { &*SCHED };
            s.event_loop.run(id);
        });
    }
}