#![allow(deprecated)]
use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
use std::{sync::mpsc, thread};
use ntex::rt::{self, Arbiter, Handle, System};
use ntex::time::{Millis, sleep};
#[ntex::test]
async fn test_join_handle() {
fn __assert_send<Fut, T>(f: Fut) -> Fut
where
Fut: Future<Output = T> + Send,
{
f
}
let hnd = rt::Handle::current();
let f = rt::spawn(async { "test" });
let f = __assert_send(f);
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
let runner = crate::System::build()
.stop_on_panic(true)
.build(ntex::rt::DefaultRuntime);
let result = runner.block_on(f).unwrap();
assert_eq!(result, "test");
let runner = crate::System::build()
.stop_on_panic(true)
.build(ntex::rt::DefaultRuntime);
let result = runner.block_on(hnd.spawn(async { "test2" })).unwrap();
let _ = tx.send(result);
});
let result = rx.await.unwrap();
assert_eq!(result, "test2");
assert!(format!("{:?}", Arbiter::current()).contains("Arbiter"));
}
#[test]
fn test_async() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let runner = crate::System::build()
.stop_on_panic(true)
.build(ntex::rt::DefaultRuntime);
let _ = runner.run(move || {
tx.send(System::current()).unwrap();
Ok(())
});
});
let s = System::new("test", ntex_net::DefaultRuntime);
let sys = rx.recv().unwrap();
let id = sys.id();
let (tx, rx) = mpsc::channel();
sys.arbiter().exec_fn(move || {
let _ = tx.send(System::current().id());
});
let id2 = rx.recv().unwrap();
assert_eq!(id, id2);
let (tx, rx) = mpsc::channel();
sys.handle().spawn(async move {
let _ = tx.send(System::current().id());
});
let id2 = rx.recv().unwrap();
assert_eq!(id, id2);
let (tx, rx) = mpsc::channel();
sys.arbiter().handle().spawn(async move {
let _ = tx.send(System::current().id());
});
let id2 = rx.recv().unwrap();
assert_eq!(id, id2);
let id2 = s
.block_on(sys.arbiter().exec(|| System::current().id()))
.unwrap();
assert_eq!(id, id2);
let (tx, rx) = mpsc::channel();
sys.arbiter().spawn(Box::pin(async move {
let _ = tx.send(System::current().id());
}));
let id2 = rx.recv().unwrap();
assert_eq!(id, id2);
}
#[cfg(feature = "tokio")]
#[test]
fn test_block_on() {
let (tx, rx) = mpsc::channel();
struct Custom;
impl ntex::rt::Runner for Custom {
fn block_on(&self, fut: ntex::rt::BlockFuture) {
let rt = ntex::rt::tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
ntex::rt::tokio::task::LocalSet::new().block_on(&rt, fut);
}
}
thread::spawn(move || {
let runner = crate::System::build()
.stop_on_panic(true)
.ping_interval(25)
.build(Custom);
let _ = runner.run(move || {
tx.send(System::current()).unwrap();
Ok(())
});
});
let s = System::new("test", ntex::rt::DefaultRuntime);
let sys = rx.recv().unwrap();
let id = sys.id();
let (tx, rx) = mpsc::channel();
sys.arbiter().exec_fn(move || {
let _ = tx.send(System::current().id());
});
let id2 = rx.recv().unwrap();
assert_eq!(id, id2);
let id2 = s
.block_on(sys.arbiter().exec(|| System::current().id()))
.unwrap();
assert_eq!(id, id2);
let (tx, rx) = mpsc::channel();
sys.arbiter().spawn(async move {
ntex::time::sleep(std::time::Duration::from_millis(100)).await;
let recs = System::list_arbiter_pings(Arbiter::current().id(), |recs| {
recs.unwrap().clone()
});
let _ = tx.send(recs);
});
let recs = rx.recv().unwrap();
assert!(!recs.is_empty());
sys.stop();
}
#[test]
fn test_arbiter_local_storage() {
let _s = System::new("test", ntex::rt::DefaultRuntime);
Arbiter::set_item("test");
assert!(Arbiter::get_item::<&'static str, _, _>(|s| *s == "test"));
assert!(Arbiter::contains_item::<&'static str>());
assert!(Arbiter::get_value(|| 64u64) == 64);
ntex::rt::set_item(100u32);
assert_eq!(ntex::rt::get_item::<u32>().unwrap(), 100);
}
#[test]
fn test_spawn_api() {
System::new("test", ntex::rt::DefaultRuntime).block_on(async {
let mut hnd = ntex::rt::spawn(async {
sleep(Millis(25)).await;
});
assert!(!hnd.is_finished());
let _ = (&mut hnd).await;
assert!(hnd.is_finished());
let hnd = crate::Handle::current();
let res = hnd
.spawn(async {
sleep(Millis(25)).await;
1
})
.await
.unwrap();
assert_eq!(res, 1);
});
}
#[test]
fn test_spawn_cb() {
let counter = Arc::new(AtomicUsize::new(0));
let c = counter.clone();
let before = move || {
let _ = c.fetch_add(1, Ordering::Relaxed);
Some(c.as_ptr() as *const _)
};
let c = counter.clone();
let enter = move |_| {
let _ = c.fetch_add(1, Ordering::Relaxed);
c.as_ptr() as *const _
};
let c = counter.clone();
let exit = move |_| {
let _ = c.fetch_add(1, Ordering::Relaxed);
};
let c = counter.clone();
let after = move |_| {
let _ = c.fetch_add(1, Ordering::Relaxed);
};
unsafe {
let set = ntex::rt::task_opt_callbacks(
before.clone(),
enter.clone(),
exit.clone(),
after.clone(),
);
assert!(set);
let set = ntex::rt::task_opt_callbacks(before, enter, exit, after);
assert!(!set);
}
System::new("test", ntex::rt::DefaultRuntime).block_on(async {
let mut hnd = ntex::rt::spawn(async {
sleep(Millis(25)).await;
});
let _ = (&mut hnd).await;
assert!(hnd.is_finished());
});
let val = counter.load(Ordering::Relaxed);
assert!(val > 0);
}
#[cfg(all(target_os = "linux", feature = "neon-polling"))]
#[ntex::test]
async fn idle_disconnect_polling() {
use std::sync::Mutex;
use ntex::connect::Connect;
use ntex::{SharedCfg, io::Io, io::IoConfig, time::Millis, time::sleep, util::Bytes};
const DATA: &[u8] = b"Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World";
let (tx, rx) = ::oneshot::channel();
let tx = Arc::new(Mutex::new(Some(tx)));
let server = ntex::server::test_server(async move || {
let tx = tx.clone();
ntex::fn_service(move |io: Io<_>| {
tx.lock().unwrap().take().unwrap().send(()).unwrap();
async move {
io.encode_bytes(Bytes::from_static(DATA)).unwrap();
sleep(Millis(250)).await;
io.close();
Ok::<_, ()>(())
}
})
});
let cfg = SharedCfg::new("NEON")
.add(IoConfig::new().set_read_buf(24, 12, 16))
.into();
let msg = Connect::new(server.addr());
let io = ntex::connect::connect_with(msg, cfg).await.unwrap();
rx.await.unwrap();
io.on_disconnect().await;
}
#[cfg(all(target_os = "linux", feature = "neon-uring"))]
#[ntex::test]
async fn idle_disconnect_uring() {
use std::sync::Mutex;
use ntex::io::{Io, IoConfig};
use ntex::{SharedCfg, connect::Connect, time::Millis, time::sleep, util::Bytes};
const DATA: &[u8] = b"Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World";
let (tx, rx) = ::oneshot::channel();
let tx = Arc::new(Mutex::new(Some(tx)));
let server = ntex::server::test_server(async move || {
let tx = tx.clone();
ntex::fn_service(move |io: Io<_>| {
tx.lock().unwrap().take().unwrap().send(()).unwrap();
async move {
io.encode_slice(DATA).unwrap();
sleep(Millis(250)).await;
io.encode_bytes(Bytes::from_static(DATA)).unwrap();
sleep(Millis(250)).await;
io.close();
Ok::<_, ()>(())
}
})
});
let cfg = SharedCfg::new("NEON-URING")
.add(IoConfig::new().set_read_buf(24, 12, 16))
.into();
let msg = Connect::new(server.addr());
let io = ntex::connect::connect_with(msg, cfg).await.unwrap();
rx.await.unwrap();
io.on_disconnect().await;
}