#![deny(
anonymous_parameters,
bare_trait_objects,
missing_copy_implementations,
missing_debug_implementations,
missing_docs, single_use_lifetimes, trivial_casts, trivial_numeric_casts,
unstable_features,
unused_extern_crates,
unused_import_braces,
unused_qualifications,
unused_results,
variant_size_differences,
warnings, clippy::all,
clippy::pedantic,
clippy::cargo,
unreachable_pub,
)]
#![allow(
clippy::blanket_clippy_restriction_lints, clippy::implicit_return, clippy::module_name_repetitions, clippy::multiple_crate_versions, clippy::missing_errors_doc, clippy::missing_panics_doc, clippy::panic_in_result_fn,
clippy::shadow_same, clippy::shadow_reuse, clippy::exhaustive_enums,
clippy::exhaustive_structs,
clippy::indexing_slicing,
clippy::separated_literal_suffix, clippy::single_char_lifetime_names, )]
use open_coroutine_core::co_pool::task::UserTaskFunc;
use open_coroutine_core::common::constants::SLICE;
pub use open_coroutine_core::net::config::Config;
use open_coroutine_core::net::UserFunc;
pub use open_coroutine_macros::*;
use std::cmp::Ordering;
use std::ffi::{c_int, c_longlong, c_uint, c_void};
use std::io::{Error, ErrorKind};
use std::marker::PhantomData;
use std::net::{TcpStream, ToSocketAddrs};
use std::ops::Deref;
use std::time::Duration;
extern "C" {
fn open_coroutine_init(config: Config) -> c_int;
fn open_coroutine_stop(secs: c_uint) -> c_int;
fn maybe_grow_stack(
red_zone: usize,
stack_size: usize,
f: UserFunc,
param: usize,
) -> c_longlong;
}
#[allow(improper_ctypes)]
extern "C" {
fn task_crate(f: UserTaskFunc, param: usize) -> open_coroutine_core::net::join::JoinHandle;
fn task_join(handle: &open_coroutine_core::net::join::JoinHandle) -> c_longlong;
fn task_timeout_join(
handle: &open_coroutine_core::net::join::JoinHandle,
ns_time: u64,
) -> c_longlong;
}
pub fn init(config: Config) {
assert_eq!(
0,
unsafe { open_coroutine_init(config) },
"open-coroutine init failed !"
);
}
pub fn shutdown() {
unsafe { _ = open_coroutine_stop(30) };
}
#[macro_export]
macro_rules! task {
( $f: expr , $param:expr $(,)? ) => {
$crate::task($f, $param)
};
}
pub fn task<P: 'static, R: 'static, F: FnOnce(P) -> R>(f: F, param: P) -> JoinHandle<R> {
extern "C" fn task_main<P: 'static, R: 'static, F: FnOnce(P) -> R>(input: usize) -> usize {
unsafe {
let ptr = &mut *((input as *mut c_void).cast::<(F, P)>());
let data = std::ptr::read_unaligned(ptr);
let result: &'static mut R = Box::leak(Box::new((data.0)(data.1)));
std::ptr::from_mut::<R>(result).cast::<c_void>() as usize
}
}
let inner = Box::leak(Box::new((f, param)));
unsafe {
task_crate(
task_main::<P, R, F>,
std::ptr::from_mut::<(F, P)>(inner).cast::<c_void>() as usize,
)
.into()
}
}
#[allow(missing_docs)]
#[repr(C)]
#[derive(Debug)]
pub struct JoinHandle<R>(open_coroutine_core::net::join::JoinHandle, PhantomData<R>);
#[allow(missing_docs)]
impl<R> JoinHandle<R> {
#[allow(clippy::cast_possible_truncation)]
pub fn timeout_join(&self, dur: Duration) -> std::io::Result<Option<R>> {
unsafe {
let ptr = task_timeout_join(self, dur.as_nanos() as u64);
match ptr.cmp(&0) {
Ordering::Less => Err(Error::new(ErrorKind::Other, "timeout join failed")),
Ordering::Equal => Ok(None),
Ordering::Greater => Ok(Some(std::ptr::read_unaligned(ptr as *mut R))),
}
}
}
pub fn join(self) -> std::io::Result<Option<R>> {
unsafe {
let ptr = task_join(&self);
match ptr.cmp(&0) {
Ordering::Less => Err(Error::new(ErrorKind::Other, "join failed")),
Ordering::Equal => Ok(None),
Ordering::Greater => Ok(Some(std::ptr::read_unaligned(ptr as *mut R))),
}
}
}
}
impl<R> From<open_coroutine_core::net::join::JoinHandle> for JoinHandle<R> {
fn from(val: open_coroutine_core::net::join::JoinHandle) -> Self {
Self(val, PhantomData)
}
}
impl<R> From<JoinHandle<R>> for open_coroutine_core::net::join::JoinHandle {
fn from(val: JoinHandle<R>) -> Self {
val.0
}
}
impl<R> Deref for JoinHandle<R> {
type Target = open_coroutine_core::net::join::JoinHandle;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[macro_export]
macro_rules! maybe_grow {
($red_zone:expr, $stack_size:expr, $f:expr $(,)?) => {
$crate::maybe_grow($red_zone, $stack_size, $f)
};
($stack_size:literal, $f:expr $(,)?) => {
$crate::maybe_grow(
open_coroutine_core::common::default_red_zone(),
$stack_size,
$f,
)
};
($f:expr $(,)?) => {
$crate::maybe_grow(
open_coroutine_core::common::default_red_zone(),
open_coroutine_core::common::constants::DEFAULT_STACK_SIZE,
$f,
)
};
}
pub fn maybe_grow<R: 'static, F: FnOnce() -> R>(
red_zone: usize,
stack_size: usize,
f: F,
) -> std::io::Result<R> {
extern "C" fn execute_on_stack<R: 'static, F: FnOnce() -> R>(input: usize) -> usize {
unsafe {
let ptr = &mut *((input as *mut c_void).cast::<F>());
let data = std::ptr::read_unaligned(ptr);
let result: &'static mut R = Box::leak(Box::new(data()));
std::ptr::from_mut::<R>(result).cast::<c_void>() as usize
}
}
let inner = Box::leak(Box::new(f));
unsafe {
let ptr = maybe_grow_stack(
red_zone,
stack_size,
execute_on_stack::<R, F>,
std::ptr::from_mut::<F>(inner).cast::<c_void>() as usize,
);
if ptr < 0 {
return Err(Error::new(ErrorKind::InvalidInput, "grow stack failed"));
}
Ok(*Box::from_raw(
usize::try_from(ptr).expect("overflow") as *mut R
))
}
}
pub fn connect_timeout<A: ToSocketAddrs>(addr: A, timeout: Duration) -> std::io::Result<TcpStream> {
let timeout_time = open_coroutine_core::common::get_timeout_time(timeout);
let mut last_err = None;
for addr in addr.to_socket_addrs()? {
loop {
let left_time = timeout_time.saturating_sub(open_coroutine_core::common::now());
if 0 == left_time {
break;
}
match TcpStream::connect_timeout(&addr, Duration::from_nanos(left_time).min(SLICE)) {
Ok(l) => return Ok(l),
Err(e) => last_err = Some(e),
}
}
}
Err(last_err.unwrap_or_else(|| {
Error::new(
ErrorKind::InvalidInput,
"could not resolve to any addresses",
)
}))
}
#[cfg(test)]
mod tests {
use crate::{init, shutdown};
use open_coroutine_core::net::config::Config;
#[test]
fn test() {
init(Config::single());
let join = task!(
|_| {
println!("Hello, world!");
},
(),
);
assert_eq!(Some(()), join.join().expect("join failed"));
shutdown();
}
}