use std::{
collections::{hash_map, HashMap},
future::Future,
pin::Pin,
sync::{atomic::AtomicI64, Arc},
time::{Duration, Instant},
};
use crate::{
client::Client,
device::SEDevice,
time::{current_time, SEPTime},
};
use rand::Rng;
use sep2_common::packages::{
identification::ResponseStatus,
objects::EventStatusType,
time::Time,
types::{MRIDType, OneHourRangeType, PrimacyType},
};
use sep2_common::traits::SEEvent;
use tokio::sync::{
broadcast::{Receiver, Sender},
RwLock,
};
pub struct EventInstance<E>
where
E: SEEvent,
{
start: i64,
end: i64,
primacy: PrimacyType,
event: Box<E>,
program_mrid: MRIDType,
status: EIStatus,
last_updated: Instant,
superseded_by: Vec<MRIDType>,
server_id: u8,
}
pub(crate) type EIPair<'a, E> = (&'a mut EventInstance<E>, &'a MRIDType);
#[derive(PartialEq, Eq, Debug, Clone, Copy)]
#[repr(u8)]
pub enum EIStatus {
Scheduled,
Active,
Cancelled,
Complete,
CancelledRandom,
Superseded,
}
impl From<EIStatus> for ResponseStatus {
fn from(value: EIStatus) -> Self {
match value {
EIStatus::Scheduled => ResponseStatus::EventReceived,
EIStatus::Active => ResponseStatus::EventStarted,
EIStatus::Cancelled => ResponseStatus::EventCancelled,
EIStatus::Complete => ResponseStatus::EventCompleted,
EIStatus::CancelledRandom => ResponseStatus::EventCancelled,
EIStatus::Superseded => ResponseStatus::EventSuperseded,
}
}
}
impl From<EventStatusType> for EIStatus {
fn from(value: EventStatusType) -> Self {
match value {
EventStatusType::Scheduled => Self::Scheduled,
EventStatusType::Active => Self::Active,
EventStatusType::Cancelled => Self::Cancelled,
EventStatusType::CancelledRandom => Self::CancelledRandom,
EventStatusType::Superseded => Self::Superseded,
}
}
}
impl<E: SEEvent> EventInstance<E> {
pub(crate) fn new(
primacy: PrimacyType,
event: E,
program_mrid: MRIDType,
server_id: u8,
) -> Self {
let start: i64 = event.interval().start.get();
let end: i64 = start + i64::from(event.interval().duration.get());
EventInstance {
status: event.event_status().current_status.into(),
event: Box::new(event),
primacy,
start,
end,
last_updated: Instant::now(),
superseded_by: vec![],
program_mrid,
server_id,
}
}
pub(crate) fn new_rand(
primacy: PrimacyType,
rand_duration: Option<OneHourRangeType>,
rand_start: Option<OneHourRangeType>,
event: E,
program_mrid: MRIDType,
server_id: u8,
) -> Self {
let start: i64 = event.interval().start.get() + randomize(rand_duration);
let end: i64 = start + i64::from(event.interval().duration.get()) + randomize(rand_start);
EventInstance {
status: event.event_status().current_status.into(),
event: Box::new(event),
primacy,
start,
end,
last_updated: Instant::now(),
superseded_by: vec![],
program_mrid,
server_id,
}
}
pub(crate) fn does_supersede(&self, other: &Self) -> bool {
self.start_time() <= other.end_time()
&& self.end_time() >= other.start_time()
&& (self.primacy < other.primacy
|| self.primacy == other.primacy
&& self.event.creation_time() > other.event.creation_time())
}
pub(crate) fn update_status(&mut self, status: EIStatus) {
self.status = status;
self.last_updated = Instant::now();
}
pub(crate) fn superseded_by(&mut self, other: &MRIDType) {
self.superseded_by.push(*other);
}
#[inline(always)]
pub fn status(&self) -> EIStatus {
self.status
}
#[inline(always)]
pub fn event(&self) -> &E {
&self.event
}
#[inline(always)]
pub fn primacy(&self) -> &PrimacyType {
&self.primacy
}
#[inline(always)]
pub fn start_time(&self) -> i64 {
self.start
}
#[inline(always)]
pub fn end_time(&self) -> i64 {
self.end
}
#[inline(always)]
pub fn program_mrid(&self) -> &MRIDType {
&self.program_mrid
}
#[inline(always)]
pub fn server_id(&self) -> u8 {
self.server_id
}
}
fn randomize(bound: Option<OneHourRangeType>) -> i64 {
bound.map_or(0, |val| {
let val = val.get();
let mut rng = rand::thread_rng();
let sign = val.signum() as i64;
rng.gen_range(0..=val.abs().into()) * sign
})
}
pub trait EventCallback<E: SEEvent>: Clone + Send + Sync + 'static {
fn event_update(&self, event: &EventInstance<E>)
-> impl Future<Output = ResponseStatus> + Send;
}
impl<F, R, E: SEEvent> EventCallback<E> for F
where
F: Fn(&EventInstance<E>) -> R + Clone + Send + Sync + 'static,
R: Future<Output = ResponseStatus> + Send + 'static,
{
fn event_update(
&self,
event: &EventInstance<E>,
) -> impl Future<Output = ResponseStatus> + Send {
self(event)
}
}
type EventsMap<E> = HashMap<MRIDType, EventInstance<E>>;
pub(crate) struct Events<E>
where
E: SEEvent,
{
map: EventsMap<E>,
next_start: Option<(i64, MRIDType)>,
next_end: Option<(i64, MRIDType)>,
}
impl<E> Events<E>
where
E: SEEvent,
{
pub(crate) fn new() -> Self {
Events {
map: HashMap::new(),
next_start: None,
next_end: None,
}
}
#[inline(always)]
pub(crate) fn next_start(&self) -> Option<(i64, MRIDType)> {
self.next_start
}
#[inline(always)]
pub(crate) fn next_end(&self) -> Option<(i64, MRIDType)> {
self.next_end
}
#[inline(always)]
pub(crate) fn iter_mut(&mut self) -> hash_map::IterMut<'_, MRIDType, EventInstance<E>> {
self.map.iter_mut()
}
pub(crate) fn insert(&mut self, mrid: &MRIDType, ei: EventInstance<E>) {
if ei.status() == EIStatus::Scheduled {
match self.next_start {
Some((start, _)) if ei.start < start => self.next_start = Some((ei.start, *mrid)),
None => self.next_start = Some((ei.start, *mrid)),
_ => (),
}
}
if ei.status() == EIStatus::Active {
match self.next_end {
Some((end, _)) if ei.end < end => self.next_end = Some((ei.end, *mrid)),
None => self.next_end = Some((ei.end, *mrid)),
_ => (),
}
}
let _ = self.map.insert(*mrid, ei);
}
#[inline(always)]
pub(crate) fn get(&self, mrid: &MRIDType) -> Option<&EventInstance<E>> {
self.map.get(mrid)
}
pub(crate) fn update_event(&mut self, event: &MRIDType, status: EIStatus) {
let event = self.map.get_mut(event).unwrap();
event.update_status(status);
self.update_nexts();
}
pub(crate) fn cancel_event(&mut self, event: &MRIDType, reason: EIStatus, current_time: i64) {
self.map
.iter_mut()
.filter(|(_, ei)| ei.status() == EIStatus::Superseded)
.for_each(|(_, ei)| {
ei.superseded_by.retain(|f| f != event);
if ei.superseded_by.is_empty() && ei.start > current_time {
ei.update_status(EIStatus::Scheduled)
}
});
self.update_event(event, reason);
}
pub(crate) fn update_nexts(&mut self) {
self.next_start = self
.map
.iter()
.filter(|(_, ei)| ei.status() == EIStatus::Scheduled)
.min_by_key(|(_, ei)| ei.start)
.map(|(mrid, ei)| (ei.start, *mrid));
self.next_end = self
.map
.iter()
.filter(|(_, ei)| ei.status() == EIStatus::Active)
.min_by_key(|(_, ei)| ei.end)
.map(|(mrid, ei)| (ei.end, *mrid));
}
}
pub(crate) type EventHandler<E> = Arc<
dyn Fn(&EventInstance<E>) -> Pin<Box<dyn Future<Output = ResponseStatus> + Send + '_>>
+ Send
+ Sync
+ 'static,
>;
pub trait Scheduler<E: SEEvent> {
type Program;
fn new(
client: Client,
device: Arc<RwLock<SEDevice>>,
handler: impl EventCallback<E>,
tickrate: Duration,
) -> Self;
fn add_event(
&mut self,
event: E,
program: &Self::Program,
server_id: u8,
) -> impl Future<Output = ()> + Send;
}
pub struct Schedule<E>
where
E: SEEvent,
{
pub(crate) client: Client,
pub(crate) device: Arc<RwLock<SEDevice>>,
pub(crate) events: Arc<RwLock<Events<E>>>,
pub(crate) handler: EventHandler<E>,
pub(crate) bc_sd: Sender<()>,
pub(crate) tickrate: Duration,
pub(crate) time_offset: Arc<AtomicI64>,
}
impl<E> Clone for Schedule<E>
where
E: SEEvent,
{
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
device: self.device.clone(),
events: self.events.clone(),
handler: self.handler.clone(),
bc_sd: self.bc_sd.clone(),
tickrate: self.tickrate,
time_offset: Arc::new(AtomicI64::new(0)),
}
}
}
impl<E> Schedule<E>
where
E: SEEvent,
{
pub fn update_time(&mut self, time: Time) {
let offset = time.current_time.get() - i64::from(current_time());
self.time_offset
.store(offset, std::sync::atomic::Ordering::Relaxed);
}
pub fn shutdown(&mut self) {
match self.bc_sd.send(()) {
Ok(_) => log::info!("{}Schedule: Successfully shutdown gracefully", E::name()),
Err(_) => log::error!("{}Schedule: Failed to shutdown gracefully", E::name()),
};
}
pub(crate) fn schedule_time(&self) -> SEPTime {
current_time() + self.time_offset.load(std::sync::atomic::Ordering::Relaxed)
}
pub(crate) async fn clean_events(self, mut rx: Receiver<()>) {
let week = Duration::from_secs(60 * 60 * 24 * 7);
let mut last = Instant::now();
let mut next = last + week;
loop {
tokio::select! {
_ = crate::time::sleep_until(next,self.tickrate) => (),
_ = rx.recv() => {
log::info!("{}Schedule: Shutting down clean event task...", E::name());
break
},
}
self.events.write().await.map.retain(|_, ei| {
matches!(ei.status, EIStatus::Active | EIStatus::Scheduled)
|| ei.last_updated > last
});
last = Instant::now();
next = last + week;
}
}
}