pub fn config() -> Config
Expand description
get the may configuration instance
Examples found in repository?
src/pool.rs (line 18)
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
fn create_dummy_coroutine() -> CoroutineImpl {
Gn::new_opt(config().get_stack_size(), move || {
unreachable!("dummy coroutine should never be called");
})
}
pub fn new() -> Self {
let capacity = config().get_pool_capacity();
let pool = SegQueue::new();
for _ in 0..capacity {
let co = Self::create_dummy_coroutine();
pool.push(co);
}
let size = AtomicUsize::new(capacity);
CoroutinePool { pool, size }
}
/// get a raw coroutine from the pool
#[inline]
pub fn get(&self) -> CoroutineImpl {
self.size.fetch_sub(1, Ordering::AcqRel);
match self.pool.pop() {
Some(co) => co,
None => {
self.size.fetch_add(1, Ordering::AcqRel);
Self::create_dummy_coroutine()
}
}
}
/// put a raw coroutine into the pool
#[inline]
pub fn put(&self, co: CoroutineImpl) {
// discard the co if push failed
let m = self.size.fetch_add(1, Ordering::AcqRel);
if m >= config().get_pool_capacity() {
self.size.fetch_sub(1, Ordering::AcqRel);
return;
}
self.pool.push(co);
}
More examples
src/coroutine_impl.rs (line 79)
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
fn drop_coroutine(co: CoroutineImpl) {
// assert!(co.is_done(), "unfinished coroutine detected");
// just consume the coroutine
// destroy the local storage
let local = unsafe { Box::from_raw(get_co_local(&co)) };
let name = local.get_co().name();
// recycle the coroutine
let (size, used) = co.stack_usage();
if used == size {
eprintln!("stack overflow detected, size={}", size);
::std::process::exit(1);
}
// show the actual used stack size in debug log
if local.get_co().stack_size() & 1 == 1 {
println!(
"coroutine name = {:?}, stack size = {}, used size = {}",
name, size, used
);
}
if size == config().get_stack_size() {
get_scheduler().pool.put(co);
}
}
}
impl EventSource for Done {
fn subscribe(&mut self, co: CoroutineImpl) {
Self::drop_coroutine(co);
}
}
/// coroutines are static generator
/// the para type is EventResult, the result type is EventSubscriber
pub type CoroutineImpl = Generator<'static, EventResult, EventSubscriber>;
#[inline]
#[allow(clippy::cast_ptr_alignment)]
fn get_co_local(co: &CoroutineImpl) -> *mut CoroutineLocal {
co.get_local_data() as *mut CoroutineLocal
}
/// /////////////////////////////////////////////////////////////////////////////
/// Coroutine
/// /////////////////////////////////////////////////////////////////////////////
/// The internal representation of a `Coroutine` handle
struct Inner {
name: Option<String>,
stack_size: usize,
park: Park,
cancel: Cancel,
}
#[derive(Clone)]
/// A handle to a coroutine.
pub struct Coroutine {
inner: Arc<Inner>,
}
impl Coroutine {
// Used only internally to construct a coroutine object without spawning
fn new(name: Option<String>, stack_size: usize) -> Coroutine {
Coroutine {
inner: Arc::new(Inner {
name,
stack_size,
park: Park::new(),
cancel: Cancel::new(),
}),
}
}
/// Gets the coroutine stack size.
pub fn stack_size(&self) -> usize {
self.inner.stack_size
}
/// Atomically makes the handle's token available if it is not already.
pub fn unpark(&self) {
self.inner.park.unpark();
}
/// cancel a coroutine
/// # Safety
///
/// This function would force a coroutine exist when next scheduling
/// And would drop all the resource tha the coroutine currently holding
/// This may have unexpected side effects if you are not fully aware it
pub unsafe fn cancel(&self) {
self.inner.cancel.cancel();
}
/// Gets the coroutine name.
pub fn name(&self) -> Option<&str> {
self.inner.name.as_deref()
}
/// Get the internal cancel
#[cfg(unix)]
#[cfg(feature = "io_cancel")]
pub(crate) fn get_cancel(&self) -> &Cancel {
&self.inner.cancel
}
}
impl fmt::Debug for Coroutine {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.name(), f)
}
}
////////////////////////////////////////////////////////////////////////////////
// Builder
////////////////////////////////////////////////////////////////////////////////
/// Coroutine factory, which can be used in order to configure the properties of
/// a new coroutine.
///
/// Methods can be chained on it in order to configure it.
///
/// The two configurations available are:
///
/// - [`name`]: specifies an [associated name for the coroutine][naming-coroutines]
/// - [`stack_size`]: specifies the [desired stack size for the coroutine][stack-size]
///
/// The [`spawn`] method will take ownership of the builder and create an
/// `io::Result` to the coroutine handle with the given configuration.
///
/// The [`coroutine::spawn`] free function uses a `Builder` with default
/// configuration and `unwrap`s its return value.
///
/// You may want to use [`spawn`] instead of [`coroutine::spawn`], when you want
/// to recover from a failure to launch a coroutine, indeed the free function will
/// panics where the `Builder` method will return a `io::Result`.
///
/// # Examples
///
/// ```
/// use may::coroutine;
///
/// let builder = coroutine::Builder::new();
/// let code = || {
/// // coroutine code
/// };
///
/// let handler = unsafe { builder.spawn(code).unwrap() };
///
/// handler.join().unwrap();
/// ```
///
/// [`coroutine::spawn`]: ./fn.spawn.html
/// [`stack_size`]: ./struct.Builder.html#method.stack_size
/// [`name`]: ./struct.Builder.html#method.name
/// [`spawn`]: ./struct.Builder.html#method.spawn
/// [naming-coroutines]: ./index.html#naming-coroutine
/// [stack-size]: ./index.html#stack-siz
#[derive(Default)]
pub struct Builder {
// A name for the coroutine-to-be, for identification in panic messages
name: Option<String>,
// The size of the stack for the spawned coroutine
stack_size: Option<usize>,
}
impl Builder {
/// Generates the base configuration for spawning a coroutine, from which
/// configuration methods can be chained.
pub fn new() -> Builder {
Builder {
name: None,
stack_size: None,
}
}
/// Names the thread-to-be. Currently the name is used for identification
/// only in panic messages.
pub fn name(mut self, name: String) -> Builder {
self.name = Some(name);
self
}
/// Sets the size of the stack for the new coroutine.
pub fn stack_size(mut self, size: usize) -> Builder {
self.stack_size = Some(size);
self
}
/// Spawns a new coroutine, and returns a join handle for it.
/// The join handle can be used to block on
/// termination of the child coroutine, including recovering its panics.
fn spawn_impl<F, T>(self, f: F) -> io::Result<(CoroutineImpl, JoinHandle<T>)>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
static DONE: Done = Done {};
let sched = get_scheduler();
let Builder { name, stack_size } = self;
let stack_size = stack_size.unwrap_or_else(|| config().get_stack_size());
// create a join resource, shared by waited coroutine and *this* coroutine
let panic = Arc::new(AtomicCell::new(None));
let join = Arc::new(Join::new(panic.clone()));
let packet = Arc::new(AtomicCell::new(None));
let their_join = join.clone();
let their_packet = packet.clone();
let subscriber = EventSubscriber {
resource: &DONE as &dyn EventSource as *const _ as *mut dyn EventSource,
};
let closure = move || {
// trigger the JoinHandler
// we must declare the variable before calling f so that stack is prepared
// to unwind these local data. for the panic err we would set it in the
// coroutine local data so that can return from the packet variable
// set the return packet
their_packet.swap(Some(f()));
their_join.trigger();
subscriber
};
let mut co = if stack_size == config().get_stack_size() {
let mut co = sched.pool.get();
co.init_code(closure);
co
} else {
Gn::new_opt(stack_size, closure)
};
let handle = Coroutine::new(name, stack_size);
// create the local storage
let local = CoroutineLocal::new(handle.clone(), join.clone());
// attache the local storage to the coroutine
co.set_local_data(Box::into_raw(local) as *mut u8);
Ok((co, make_join_handle(handle, join, packet, panic)))
}
src/scheduler.rs (line 38)
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
fn init_scheduler() {
let workers = config().get_workers();
let b: Box<Scheduler> = Scheduler::new(workers);
unsafe { SCHED = Box::into_raw(b) };
// timer thread
thread::spawn(move || {
// timer function
let timer_event_handler = |c: Arc<AtomicOption<CoroutineImpl>>| {
// just re-push the co to the visit list
if let Some(mut co) = c.take(Ordering::Relaxed) {
// set the timeout result for the coroutine
set_co_para(&mut co, io::Error::new(io::ErrorKind::TimedOut, "timeout"));
// s.schedule_global(c);
run_coroutine(co);
}
};
let s = unsafe { &*SCHED };
s.timer_thread.run(&timer_event_handler);
});
// io event loop thread
for id in 0..workers {
thread::spawn(move || {
let s = unsafe { &*SCHED };
s.event_loop.run(id);
});
}
}