#![allow(unused)]
#![cfg(all(feature = "derive", feature = "force-poll"))]
mod common;
use core::sync::atomic::{AtomicBool, Ordering};
static LOGGER_INIT: AtomicBool = AtomicBool::new(false);
use crossbus::{
actor::{Actor, ActorState, Handle, Localizer},
context::Context,
delayer::Delaying,
reactor::Reactor,
Message,
};
use core::{
pin::Pin,
task::{Context as CoreContext, Poll},
time::Duration,
};
const DUR_DELAYED: f64 = 0.5;
const DUR_FINITE: f64 = 0.3;
const DUR_INF: f64 = 0.7;
#[derive(Message, Debug)]
struct Msg {
typ: MsgType,
content: &'static str,
}
#[derive(Debug)]
enum MsgType {
Instant,
Delayed,
Finite(usize),
Infinite,
}
struct Narrator {
instant: Desc,
delayed: Desc,
finite: Desc,
inf: Desc,
}
struct Desc {
used: usize,
send_at: Option<f64>,
receive_at: Option<f64>,
}
impl Desc {
fn new() -> Self {
Self {
used: 0,
send_at: None,
receive_at: None,
}
}
}
impl Delaying<Narrator> for Narrator {}
impl Actor for Narrator {
type Message = Msg;
fn create(ctx: &mut Context<Self>) -> Self {
Self {
instant: Desc::new(),
delayed: Desc::new(),
finite: Desc::new(),
inf: Desc::new(),
}
}
fn started(&mut self, ctx: &mut Context<Self>) {
let sender = ctx.sender();
let mut items = vec![
Msg {
typ: MsgType::Instant,
content: "instant msg",
},
Msg {
typ: MsgType::Delayed,
content: "delayed msg",
},
Msg {
typ: MsgType::Finite(3),
content: "repeat 3 msg",
},
Msg {
typ: MsgType::Infinite,
content: "loop msg",
},
];
let now = common::get_now();
while let Some(item) = items.pop() {
match item.typ {
MsgType::Instant => {
ctx.instant_message(item);
self.instant.send_at = Some(now);
}
MsgType::Delayed => {
self.delayed.send_at = Some(now);
let dur = Duration::from_secs_f64(DUR_DELAYED);
#[cfg(any(feature = "async-std", feature = "tokio"))]
ctx.delay_message::<std::time::Instant>(item, dur);
#[cfg(feature = "wasm32")]
ctx.delay_message::<common::time_wasm32::Clock>(item, dur);
}
MsgType::Finite(num) => {
self.finite.send_at = Some(now);
let dur = Duration::from_secs_f64(DUR_FINITE);
#[cfg(any(feature = "async-std", feature = "tokio"))]
unsafe {
ctx.repeat_message::<std::time::Instant>(item, Some(dur), Some(num))
};
#[cfg(feature = "wasm32")]
unsafe {
ctx.repeat_message::<common::time_wasm32::Clock>(item, Some(dur), Some(num))
};
}
MsgType::Infinite => {
self.inf.send_at = Some(now);
let dur = Duration::from_secs_f64(DUR_INF);
#[cfg(any(feature = "async-std", feature = "tokio"))]
unsafe {
ctx.repeat_message::<std::time::Instant>(item, Some(dur), None)
};
#[cfg(feature = "wasm32")]
unsafe {
ctx.repeat_message::<common::time_wasm32::Clock>(item, Some(dur), None)
};
}
}
}
}
fn action(&mut self, msg: Self::Message, ctx: &mut Context<Self>) {
let now = common::get_now();
match msg.typ {
MsgType::Instant => {
self.instant.used += 1;
self.instant.receive_at = Some(now);
log::info!(
"instant message: {:?}, elapsed: {}",
msg,
self.instant.receive_at.unwrap() - self.instant.send_at.unwrap()
);
}
MsgType::Delayed => {
self.delayed.used += 1;
self.delayed.receive_at = Some(now);
log::info!(
"{:?}, elapsed: {}",
msg,
now - self.delayed.send_at.unwrap()
);
}
MsgType::Finite(num) => {
self.finite.used += 1;
self.finite.receive_at = Some(now);
log::info!(
"{:?}, elapsed: {} secs with {} items",
msg,
now - self.finite.send_at.unwrap(),
self.finite.used
);
}
MsgType::Infinite => {
self.inf.used = self.inf.used + 1;
self.inf.receive_at = Some(now);
log::info!(
"{:?}, elapsed: {}, Infinite loop, now: {}",
msg,
now - self.inf.send_at.as_ref().unwrap(),
self.inf.used,
);
}
}
}
}
#[cfg(feature = "tokio")]
fn replace_tokio_runtime() {
use crossbus::rt::runtime_tokio::TokioRuntime;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.unwrap();
TokioRuntime::set_runtime(rt);
}
#[test]
#[cfg(feature = "tokio")]
fn test_delay() {
use crossbus::rt::{runtime_tokio::Runtime, Spawning};
if !LOGGER_INIT.load(Ordering::SeqCst) {
common::init_module_level("test_delay", log::Level::Debug);
LOGGER_INIT.store(true, Ordering::SeqCst);
}
#[crossbus::main(runtime = tokio)]
async fn main() {
let _ = Narrator::start();
}
main();
}
#[test]
#[cfg(feature = "async-std")]
fn test_delay() {
use crossbus::rt::{runtime_async_std::Runtime, Spawning};
if !LOGGER_INIT.load(Ordering::SeqCst) {
common::init_module_level("test_delay", log::Level::Debug);
LOGGER_INIT.store(true, Ordering::SeqCst);
}
#[crossbus::main(runtime = async-std)]
async fn main() {
let _ = Narrator::start();
}
main();
}
#[cfg(any(feature = "wasm32"))]
#[wasm_bindgen_test::wasm_bindgen_test]
fn test_delay() {
use crossbus::rt::{runtime_wasm32::Runtime, Spawning};
if !LOGGER_INIT.load(Ordering::SeqCst) {
common::init();
LOGGER_INIT.store(true, Ordering::SeqCst);
}
#[crossbus::main(runtime = wasm32)]
async fn main() {
let _ = Narrator::start();
}
main();
}