use chrono::Duration;
use std::sync::atomic::{AtomicBool, Ordering};
use triple_buffer::{TripleBuffer, Input, Output};
use super::{
state::{ClientStartStopState, ClientState},
timeline::Timeline,
};
use crate::link::IncomingClientState;
#[derive(Clone, Debug, Default)]
pub struct RtClientState {
pub timeline: Timeline,
pub start_stop_state: ClientStartStopState,
pub timeline_timestamp: Duration,
pub start_stop_state_timestamp: Duration,
}
impl From<ClientState> for RtClientState {
fn from(client_state: ClientState) -> Self {
Self {
timeline: client_state.timeline,
start_stop_state: client_state.start_stop_state,
timeline_timestamp: Duration::zero(),
start_stop_state_timestamp: Duration::zero(),
}
}
}
impl From<RtClientState> for ClientState {
fn from(rt_state: RtClientState) -> Self {
Self {
timeline: rt_state.timeline,
start_stop_state: rt_state.start_stop_state,
}
}
}
pub struct RtSessionStateHandler {
rt_client_state: RtClientState,
timeline_input: Input<(Duration, Timeline)>,
timeline_output: Output<(Duration, Timeline)>,
start_stop_input: Input<ClientStartStopState>,
start_stop_output: Output<ClientStartStopState>,
has_pending_updates: AtomicBool,
local_mod_grace_period: Duration,
}
impl RtSessionStateHandler {
pub fn new(initial_state: ClientState, grace_period: Duration) -> Self {
let default_timeline_entry = (Duration::zero(), initial_state.timeline);
let default_start_stop_state = initial_state.start_stop_state;
let timeline_buffer = TripleBuffer::new(&default_timeline_entry);
let (timeline_input, timeline_output) = timeline_buffer.split();
let start_stop_buffer = TripleBuffer::new(&default_start_stop_state);
let (start_stop_input, start_stop_output) = start_stop_buffer.split();
Self {
rt_client_state: initial_state.into(),
timeline_input,
timeline_output,
start_stop_input,
start_stop_output,
has_pending_updates: AtomicBool::new(false),
local_mod_grace_period: grace_period,
}
}
pub fn get_rt_client_state(&self, current_time: Duration) -> ClientState {
let timeline_grace_expired =
current_time - self.rt_client_state.timeline_timestamp > self.local_mod_grace_period;
let start_stop_grace_expired =
current_time - self.rt_client_state.start_stop_state_timestamp > self.local_mod_grace_period;
let updated_state = self.rt_client_state.clone();
if timeline_grace_expired {
}
if start_stop_grace_expired {
}
updated_state.into()
}
pub fn update_rt_client_state(
&mut self,
incoming_state: IncomingClientState,
current_time: Duration,
is_enabled: bool,
) {
if incoming_state.timeline.is_none() && incoming_state.start_stop_state.is_none() {
return;
}
if let Some(timeline) = incoming_state.timeline {
self.timeline_input.write((incoming_state.timeline_timestamp, timeline));
self.rt_client_state.timeline = timeline;
self.rt_client_state.timeline_timestamp = if is_enabled {
current_time
} else {
Duration::zero()
};
}
if let Some(start_stop_state) = incoming_state.start_stop_state {
self.start_stop_input.write(start_stop_state);
self.rt_client_state.start_stop_state = start_stop_state;
self.rt_client_state.start_stop_state_timestamp = if is_enabled {
current_time
} else {
Duration::zero()
};
}
self.has_pending_updates.store(true, Ordering::Release);
}
pub fn process_pending_updates(&mut self) -> Option<IncomingClientState> {
if !self.has_pending_updates.load(Ordering::Acquire) {
return None;
}
let mut result = IncomingClientState {
timeline: None,
start_stop_state: None,
timeline_timestamp: Duration::zero(),
};
if self.timeline_output.update() {
let (timestamp, timeline) = *self.timeline_output.read();
result.timeline = Some(timeline);
result.timeline_timestamp = timestamp;
}
if self.start_stop_output.update() {
let start_stop_state = *self.start_stop_output.read();
result.start_stop_state = Some(start_stop_state);
}
if result.timeline.is_some() || result.start_stop_state.is_some() {
self.has_pending_updates.store(false, Ordering::Release);
Some(result)
} else {
None
}
}
pub fn has_pending_updates(&self) -> bool {
self.has_pending_updates.load(Ordering::Acquire)
}
pub fn grace_period(&self) -> Duration {
self.local_mod_grace_period
}
pub fn set_grace_period(&mut self, grace_period: Duration) {
self.local_mod_grace_period = grace_period;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::link::{
beats::Beats,
tempo::Tempo,
};
fn create_test_client_state() -> ClientState {
ClientState {
timeline: Timeline {
tempo: Tempo::new(120.0),
beat_origin: Beats::new(0.0),
time_origin: Duration::zero(),
},
start_stop_state: ClientStartStopState {
is_playing: false,
time: Duration::zero(),
timestamp: Duration::zero(),
},
}
}
#[test]
fn test_rt_session_state_handler_creation() {
let client_state = create_test_client_state();
let handler = RtSessionStateHandler::new(client_state, Duration::milliseconds(1000));
assert_eq!(handler.grace_period(), Duration::milliseconds(1000));
assert!(!handler.has_pending_updates());
}
#[test]
fn test_rt_session_state_handler_get_state() {
let client_state = create_test_client_state();
let handler = RtSessionStateHandler::new(client_state, Duration::milliseconds(1000));
let current_time = Duration::milliseconds(500);
let retrieved_state = handler.get_rt_client_state(current_time);
assert_eq!(retrieved_state.timeline.tempo.bpm(), client_state.timeline.tempo.bpm());
assert_eq!(retrieved_state.start_stop_state.is_playing, client_state.start_stop_state.is_playing);
}
#[test]
fn test_rt_session_state_handler_updates() {
let client_state = create_test_client_state();
let mut handler = RtSessionStateHandler::new(client_state, Duration::milliseconds(1000));
let new_timeline = Timeline {
tempo: Tempo::new(140.0),
beat_origin: Beats::new(1.0),
time_origin: Duration::milliseconds(1000),
};
let incoming_state = IncomingClientState {
timeline: Some(new_timeline),
start_stop_state: None,
timeline_timestamp: Duration::milliseconds(2000),
};
handler.update_rt_client_state(incoming_state, Duration::milliseconds(3000), true);
assert!(handler.has_pending_updates());
let processed = handler.process_pending_updates();
assert!(processed.is_some());
let processed_state = processed.unwrap();
assert!(processed_state.timeline.is_some());
assert_eq!(processed_state.timeline.unwrap().tempo.bpm(), 140.0);
}
#[test]
fn test_rt_session_state_handler_grace_period() {
let client_state = create_test_client_state();
let mut handler = RtSessionStateHandler::new(client_state, Duration::milliseconds(1000));
handler.set_grace_period(Duration::milliseconds(2000));
assert_eq!(handler.grace_period(), Duration::milliseconds(2000));
}
#[test]
fn test_rt_client_state_conversion() {
let client_state = create_test_client_state();
let rt_state: RtClientState = client_state.into();
let converted_back: ClientState = rt_state.into();
assert_eq!(client_state.timeline.tempo.bpm(), converted_back.timeline.tempo.bpm());
assert_eq!(client_state.start_stop_state.is_playing, converted_back.start_stop_state.is_playing);
}
}