#![allow(unused)]
mod common;
use core::sync::atomic::{AtomicBool, Ordering};
static LOGGER_INIT: AtomicBool = AtomicBool::new(false);
use crossbus::{
actor::{Actor, Handle},
context::Context,
stream::{Stream, Streaming},
Message,
};
use core::{
pin::Pin,
task::{Context as CoreContext, Poll},
};
use futures_core::stream;
#[derive(Debug)]
struct Sum {
sum: i32,
handle: Option<Handle>,
loc: bool,
counter: usize,
pause_dur: f64,
pause_at: Option<f64>,
last_dur: f64,
start_at: Option<f64>,
}
#[derive(Debug, Message)]
struct Num(i32);
#[derive(Debug, Message)]
enum Order {
Number(Num),
}
struct St {
items: Vec<Num>,
anchor: Vec<(usize, f32)>,
deadline: Option<f64>,
index: usize,
}
impl stream::Stream for St {
type Item = Num;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut CoreContext<'_>) -> Poll<Option<Self::Item>> {
if self.items.is_empty() {
return Poll::Ready(None);
}
if self.anchor.is_empty() {
let item = self.items.remove(0);
self.index += 1;
assert!(self.deadline.is_none());
log::debug!("empty anchor, St left: {:?}", self.items);
return Poll::Ready(Some(item));
}
let (ind, dur) = self.anchor[0];
if self.index == ind {
let ins = get_now();
if self.deadline.is_none() {
self.deadline = Some(ins + dur as f64);
log::info!("set deadline: {:?}", self.deadline);
}
if self.deadline.unwrap() > ins {
Poll::Pending
} else {
let item = self.items.remove(0);
self.anchor.remove(0);
self.index += 1;
self.deadline = None;
log::info!("reset deadline");
log::debug!("timeout done, st left: {:?}", self.items);
Poll::Ready(Some(item))
}
} else {
let item = self.items.remove(0);
self.index += 1;
assert!(self.deadline.is_none());
log::debug!("stream now, st left: {:?}", self.items);
Poll::Ready(Some(item))
}
}
}
impl Actor for Sum {
type Message = ();
fn create(_: &mut Context<Self>) -> Self {
Self {
sum: 0,
handle: None,
counter: 0,
loc: false,
pause_dur: 0.01,
pause_at: None,
last_dur: 4.0,
start_at: None,
}
}
fn started(&mut self, ctx: &mut Context<Self>) {
let items = vec![
Num(1), Num(2), Num(1), Num(3), Num(1), Num(5), Num(1), Num(1), Num(1), Num(1), ];
let anchor = vec![(1, 1.0), (3, 1.0), (9, 3.0)];
let st = St {
items,
anchor,
deadline: None,
index: 0,
};
let handle = ctx.streaming(st);
if handle.inner() != 0 {
self.handle = Some(handle);
}
}
fn action(&mut self, _msg: Self::Message, _: &mut Context<Self>) {
log::error!("message should not be here, and handled by Stream::action");
}
fn stopped(&mut self, _: &mut Context<Self>) {
}
}
use crossbus::stream::{StreamState, StreamingState};
impl Stream<Num> for Sum {
fn started(&mut self, _: &mut Context<Self>) {
let now = get_now();
self.start_at.replace(now);
}
fn state(&mut self, ctx: &mut Context<Self>) -> StreamingState {
if self.counter == 5 {
let st = ctx.downcast_ref::<Streaming<Self, St>>(self.handle.unwrap());
assert!(st.is_some());
let st = st.unwrap();
if st.state() == StreamState::Paused {
let ins = get_now();
if self.pause_at.is_none() {
self.pause_at = Some(ins);
}
if self.pause_at.unwrap() + self.pause_dur > ins {
return StreamingState::Pause;
}
log::warn!("St resumed at index 5");
return StreamingState::Resume;
} else if !self.loc {
self.loc = true;
log::warn!("St paused at index 5");
return StreamingState::Pause;
}
}
StreamingState::Continue
}
fn action(&mut self, msg: Num, _: &mut Context<Self>) {
self.sum += msg.0;
self.counter += 1;
log::info!("current sum: {}", self.sum);
}
fn finished(&mut self, _: &mut Context<Self>) {
log::info!("stream finished, whole sum: {}", self.sum);
}
}
#[test]
#[cfg(feature = "tokio")]
fn test_stream() {
use crossbus::rt::{runtime_tokio::Runtime, Spawning};
if !LOGGER_INIT.load(Ordering::SeqCst) {
common::init();
LOGGER_INIT.store(true, Ordering::SeqCst);
}
#[crossbus::main(runtime = tokio)]
async fn main() {
let (addr, _) = Sum::start();
}
main();
}
#[test]
#[cfg(feature = "async-std")]
fn test_stream() {
use crossbus::rt::{runtime_async_std::Runtime, Spawning};
if !LOGGER_INIT.load(Ordering::SeqCst) {
common::init();
LOGGER_INIT.store(true, Ordering::SeqCst);
}
#[crossbus::main(runtime = async-std)]
async fn main() {
let (addr, _) = Sum::start();
}
main();
}
#[cfg(any(feature = "wasm32"))]
#[wasm_bindgen_test::wasm_bindgen_test]
fn test_stream() {
use crossbus::rt::{runtime_wasm32::Runtime, Spawning};
if !LOGGER_INIT.load(Ordering::SeqCst) {
common::init_module_level("test_stream", log::Level::Debug);
LOGGER_INIT.store(true, Ordering::SeqCst);
}
#[crossbus::main(runtime = wasm32)]
async fn main() {
let (addr, _) = Sum::start();
}
main();
}
#[cfg(any(feature = "wasm32"))]
fn now() -> f64 {
use js_sys::Date;
Date::now() / 1000.0
}
fn get_now() -> f64 {
#[cfg(any(feature = "tokio", feature = "async-std"))]
{
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs_f64()
}
#[cfg(feature = "wasm32")]
{
now()
}
#[cfg(not(any(feature = "wasm32", feature = "tokio", feature = "async-std")))]
unimplemented!()
}