use crate::adapters::aeron::status::AeronStatus;
use crate::{Burst, GraphState, MutableNode, Node, StreamPeekRef, UpStreams};
use std::rc::Weak;
#[derive(Default)]
pub struct AeronStatusStream {
last: AeronStatus,
out: Burst<AeronStatus>,
producer: Option<Weak<dyn Node>>,
}
impl std::fmt::Debug for AeronStatusStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AeronStatusStream")
.field("last", &self.last)
.field("out", &self.out)
.field("has_producer", &self.producer.is_some())
.finish()
}
}
impl AeronStatusStream {
pub(crate) fn set_producer(&mut self, producer: Weak<dyn Node>) {
self.producer = Some(producer);
}
pub(crate) fn record(&mut self, new: AeronStatus) -> bool {
if new != self.last {
self.last = new;
self.out.push(new);
true
} else {
false
}
}
pub(crate) fn clear(&mut self) {
self.out.clear();
}
pub fn current(&self) -> AeronStatus {
self.out.last().copied().unwrap_or(self.last)
}
}
impl MutableNode for AeronStatusStream {
fn cycle(&mut self, _state: &mut GraphState) -> anyhow::Result<bool> {
Ok(!self.out.is_empty())
}
fn start(&mut self, _state: &mut GraphState) -> anyhow::Result<()> {
Ok(())
}
fn upstreams(&self) -> UpStreams {
match self.producer.as_ref().and_then(Weak::upgrade) {
Some(producer) => UpStreams::new(vec![producer], vec![]),
None => UpStreams::none(),
}
}
}
impl StreamPeekRef<Burst<AeronStatus>> for AeronStatusStream {
fn peek_ref(&self) -> &Burst<AeronStatus> {
&self.out
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{NanoTime, RunFor, RunMode};
fn make_graph_state() -> GraphState {
GraphState::new(
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Cycles(1),
NanoTime::ZERO,
)
}
#[test]
fn given_status_stream_when_record_same_then_no_emission() {
let mut s = AeronStatusStream::default();
s.record(AeronStatus::Disconnected);
assert!(s.peek_ref().is_empty());
s.record(AeronStatus::Connected);
s.record(AeronStatus::Connected);
assert_eq!(s.peek_ref().len(), 1);
assert_eq!(s.peek_ref().last(), Some(&AeronStatus::Connected));
}
#[test]
fn given_status_stream_when_record_different_then_emission() {
let mut s = AeronStatusStream::default();
s.record(AeronStatus::Connected);
assert_eq!(s.peek_ref().len(), 1);
assert_eq!(s.peek_ref().last(), Some(&AeronStatus::Connected));
assert_eq!(s.current(), AeronStatus::Connected);
}
#[test]
fn given_status_stream_when_clear_between_cycles_then_burst_resets() {
let mut s = AeronStatusStream::default();
s.record(AeronStatus::Connected);
assert!(!s.peek_ref().is_empty());
s.clear();
assert!(s.peek_ref().is_empty());
s.record(AeronStatus::Connected);
assert!(s.peek_ref().is_empty());
}
#[test]
fn given_connect_then_disconnect_when_two_cycles_then_two_separate_transitions() {
let mut s = AeronStatusStream::default();
s.clear();
s.record(AeronStatus::Connected);
assert_eq!(s.peek_ref().len(), 1);
assert_eq!(s.peek_ref().last(), Some(&AeronStatus::Connected));
s.clear();
s.record(AeronStatus::Disconnected);
assert_eq!(s.peek_ref().len(), 1);
assert_eq!(s.peek_ref().last(), Some(&AeronStatus::Disconnected));
}
#[test]
fn given_status_stream_when_no_record_then_cycle_returns_false() {
let mut s = AeronStatusStream::default();
let mut gs = make_graph_state();
assert!(!s.cycle(&mut gs).unwrap());
}
#[test]
fn given_status_stream_when_recorded_then_cycle_returns_true_once_until_clear() {
let mut s = AeronStatusStream::default();
let mut gs = make_graph_state();
s.record(AeronStatus::Connected);
assert!(s.cycle(&mut gs).unwrap());
assert!(s.cycle(&mut gs).unwrap());
s.clear();
assert!(!s.cycle(&mut gs).unwrap());
}
#[test]
fn given_status_stream_when_current_with_no_record_then_returns_default() {
let s = AeronStatusStream::default();
assert_eq!(s.current(), AeronStatus::Disconnected);
}
#[test]
fn given_status_stream_when_current_after_record_then_returns_recorded() {
let mut s = AeronStatusStream::default();
s.record(AeronStatus::BackPressured);
assert_eq!(s.current(), AeronStatus::BackPressured);
}
}