use std::time::{Duration, Instant};
use async_std::sync::Arc;
use async_std::task;
use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
use futures::future::BoxFuture;
use task::JoinHandle;
pub use self::company::Company;
pub use self::current_price::CurrentPrice;
pub use self::default_timestamps::DefaultTimestamps;
pub use self::options_data::OptionsData;
pub use self::options_dates::OptionsDates;
pub use self::prices::Prices;
use crate::{DATA_RECEIVED, UPDATE_INTERVAL};
mod company;
mod current_price;
mod default_timestamps;
mod options_data;
mod options_dates;
mod prices;
pub trait AsyncTask: 'static {
type Input: Send + Sync;
type Response: Send;
fn update_interval(&self) -> Option<Duration>;
fn input(&self) -> Self::Input;
fn task<'a>(input: Arc<Self::Input>) -> BoxFuture<'a, Option<Self::Response>>;
fn connect(&self) -> AsyncTaskHandle<Self::Response> {
let (command_sender, command_receiver) = bounded(1);
let (response_sender, response_receiver) = unbounded::<Self::Response>();
let data_received = DATA_RECEIVED.0.clone();
let update_interval = self.update_interval();
let input = Arc::new(self.input());
let handle = task::spawn(async move {
let mut last_updated = Instant::now();
let mut paused = false;
if let Some(response) = <Self as AsyncTask>::task(input.clone()).await {
let _ = response_sender.send(response);
let _ = data_received.try_send(());
}
let update_interval = if let Some(interval) = update_interval {
interval.max(Duration::from_secs(*UPDATE_INTERVAL))
} else {
return;
};
loop {
if let Ok(command) = command_receiver.try_recv() {
match command {
AsyncTaskCommand::Resume => paused = false,
AsyncTaskCommand::Pause => paused = true,
}
}
if last_updated.elapsed() >= update_interval && !paused {
if let Some(response) = <Self as AsyncTask>::task(input.clone()).await {
let _ = response_sender.send(response);
let _ = data_received.try_send(());
}
last_updated = Instant::now();
}
task::sleep(Duration::from_millis(500)).await;
}
});
AsyncTaskHandle {
response: response_receiver,
handle: Some(handle),
command_sender,
}
}
}
enum AsyncTaskCommand {
Pause,
Resume,
}
pub struct AsyncTaskHandle<R> {
response: Receiver<R>,
handle: Option<JoinHandle<()>>,
command_sender: Sender<AsyncTaskCommand>,
}
impl<R> AsyncTaskHandle<R> {
pub fn response(&self) -> &Receiver<R> {
&self.response
}
pub fn pause(&self) {
let _ = self.command_sender.try_send(AsyncTaskCommand::Pause);
}
pub fn resume(&self) {
let _ = self.command_sender.try_send(AsyncTaskCommand::Resume);
}
}
impl<R> Drop for AsyncTaskHandle<R> {
fn drop(&mut self) {
let handle = self.handle.take().unwrap();
task::spawn(async { handle.cancel().await });
}
}