use crate::adapters::aeron::buffer::FragmentBuffer;
use crate::adapters::aeron::error::TransportError;
use crate::adapters::aeron::status::AeronStatus;
use crate::adapters::aeron::status_stream::AeronStatusStream;
use crate::adapters::aeron::transport::AeronSubscriberBackend;
use crate::channel::{ChannelSender, Message};
use crate::nodes::receiver::ReceiverStream;
use crate::{
Burst, Element, GraphState, IntoStream, MutableNode, Stream, StreamPeekRef, UpStreams,
};
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::{
Arc, Mutex,
atomic::{AtomicBool, Ordering},
};
use std::time::Duration;
use tinyvec::TinyVec;
pub(crate) struct AeronSpinSubFragmentNode<T, F, B>
where
T: Element,
F: FnMut(&FragmentBuffer<'_>) -> Result<Option<T>, TransportError>,
B: AeronSubscriberBackend,
{
backend: B,
parser: F,
value: Burst<T>,
status: Option<Rc<RefCell<AeronStatusStream>>>,
}
impl<T, F, B> AeronSpinSubFragmentNode<T, F, B>
where
T: Element,
F: FnMut(&FragmentBuffer<'_>) -> Result<Option<T>, TransportError>,
B: AeronSubscriberBackend,
{
#[must_use]
pub(crate) fn new(backend: B, parser: F) -> Self {
Self {
backend,
parser,
value: Burst::new(),
status: None,
}
}
#[must_use]
pub(crate) fn with_status(
backend: B,
parser: F,
status: Rc<RefCell<AeronStatusStream>>,
) -> Self {
Self {
backend,
parser,
value: Burst::new(),
status: Some(status),
}
}
}
impl<T, F, B> MutableNode for AeronSpinSubFragmentNode<T, F, B>
where
T: Element,
F: FnMut(&FragmentBuffer<'_>) -> Result<Option<T>, TransportError> + 'static,
B: AeronSubscriberBackend,
{
fn cycle(&mut self, _state: &mut GraphState) -> anyhow::Result<bool> {
self.value.clear();
if let Some(status) = &self.status {
status.borrow_mut().clear();
}
let parser = &mut self.parser;
let value = &mut self.value;
self.backend
.poll_fragments(&mut |frag| match parser(frag) {
Ok(Some(v)) => value.push(v),
Ok(None) => {}
Err(e) => log::warn!(
"aeron sub: parser dropped fragment at position {}: {e}",
frag.position()
),
})?;
let transition = if let Some(status) = &self.status {
let new_status = if self.backend.is_closed() {
AeronStatus::Closed
} else if self.backend.is_connected() {
AeronStatus::Connected
} else {
AeronStatus::Disconnected
};
status.borrow_mut().record(new_status)
} else {
false
};
Ok(!self.value.is_empty() || transition)
}
fn start(&mut self, state: &mut GraphState) -> anyhow::Result<()> {
state.always_callback();
Ok(())
}
fn upstreams(&self) -> UpStreams {
UpStreams::none()
}
}
impl<T, F, B> StreamPeekRef<Burst<T>> for AeronSpinSubFragmentNode<T, F, B>
where
T: Element,
F: FnMut(&FragmentBuffer<'_>) -> Result<Option<T>, TransportError> + 'static,
B: AeronSubscriberBackend,
{
fn peek_ref(&self) -> &Burst<T> {
&self.value
}
}
struct ThreadedAeronFragmentNode<T: Element + Send> {
inner: ReceiverStream<T>,
stop_flag: Arc<AtomicBool>,
}
impl<T: Element + Send> MutableNode for ThreadedAeronFragmentNode<T> {
fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
self.inner.cycle(state)
}
fn upstreams(&self) -> UpStreams {
self.inner.upstreams()
}
fn setup(&mut self, state: &mut GraphState) -> anyhow::Result<()> {
self.inner.setup(state)
}
fn start(&mut self, state: &mut GraphState) -> anyhow::Result<()> {
self.inner.start(state)
}
fn stop(&mut self, state: &mut GraphState) -> anyhow::Result<()> {
self.stop_flag.store(true, Ordering::Relaxed);
self.inner.stop(state).ok();
Ok(())
}
fn teardown(&mut self, state: &mut GraphState) -> anyhow::Result<()> {
self.inner.teardown(state)
}
}
impl<T: Element + Send> StreamPeekRef<TinyVec<[T; 1]>> for ThreadedAeronFragmentNode<T> {
fn peek_ref(&self) -> &TinyVec<[T; 1]> {
self.inner.peek_ref()
}
}
pub(crate) fn build_threaded<T, F, B>(backend: B, parser: F) -> Rc<dyn Stream<Burst<T>>>
where
T: Element + Send,
F: FnMut(&FragmentBuffer<'_>) -> Result<Option<T>, TransportError> + Send + 'static,
B: AeronSubscriberBackend,
{
let stop_flag = Arc::new(AtomicBool::new(false));
let stop_thread = stop_flag.clone();
let state = Mutex::new(Some((backend, parser)));
let inner = ReceiverStream::new(
move |sender: ChannelSender<T>, _stop: Arc<AtomicBool>| {
let mut state_guard = state.lock().expect("state lock poisoned");
let (mut backend, mut parser) = state_guard
.take()
.expect("threaded aeron burst closure called more than once");
drop(state_guard);
let mut idle_count = 0u32;
loop {
if stop_thread.load(Ordering::Relaxed) {
return Ok(());
}
let count = backend.poll_fragments(&mut |frag| match parser(frag) {
Ok(Some(v)) => {
let _ = sender.send_message(Message::RealtimeValue(v));
}
Ok(None) => {}
Err(e) => log::warn!(
"aeron sub: parser dropped fragment at position {}: {e}",
frag.position()
),
})?;
if count == 0 {
idle_count = (idle_count + 1).min(20);
let micros = 1u64 << idle_count.min(10);
std::thread::sleep(Duration::from_micros(micros));
} else {
idle_count = 0;
}
}
},
true,
);
ThreadedAeronFragmentNode { inner, stop_flag }.into_stream()
}
#[derive(Debug, Clone)]
enum AeronItem<T> {
Data(T),
Status(AeronStatus),
}
impl<T: Default> Default for AeronItem<T> {
fn default() -> Self {
AeronItem::Data(T::default())
}
}
struct ThreadedAeronStatusFragmentNode<T: Element + Send> {
inner: ReceiverStream<AeronItem<T>>,
value: Burst<T>,
status: Rc<RefCell<AeronStatusStream>>,
stop_flag: Arc<AtomicBool>,
}
impl<T: Element + Send> MutableNode for ThreadedAeronStatusFragmentNode<T> {
fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
self.value.clear();
self.status.borrow_mut().clear();
self.inner.cycle(state)?;
let mut transition = false;
for item in self.inner.peek_ref().iter() {
match item {
AeronItem::Data(v) => self.value.push(v.clone()),
AeronItem::Status(s) => {
transition |= self.status.borrow_mut().record(*s);
}
}
}
Ok(!self.value.is_empty() || transition)
}
fn upstreams(&self) -> UpStreams {
self.inner.upstreams()
}
fn setup(&mut self, state: &mut GraphState) -> anyhow::Result<()> {
self.inner.setup(state)
}
fn start(&mut self, state: &mut GraphState) -> anyhow::Result<()> {
self.inner.start(state)
}
fn stop(&mut self, state: &mut GraphState) -> anyhow::Result<()> {
self.stop_flag.store(true, Ordering::Relaxed);
self.inner.stop(state).ok();
Ok(())
}
fn teardown(&mut self, state: &mut GraphState) -> anyhow::Result<()> {
self.inner.teardown(state)
}
}
impl<T: Element + Send> StreamPeekRef<Burst<T>> for ThreadedAeronStatusFragmentNode<T> {
fn peek_ref(&self) -> &Burst<T> {
&self.value
}
}
pub(crate) fn build_threaded_with_status<T, F, B>(
backend: B,
parser: F,
status: Rc<RefCell<AeronStatusStream>>,
) -> Rc<dyn Stream<Burst<T>>>
where
T: Element + Send,
F: FnMut(&FragmentBuffer<'_>) -> Result<Option<T>, TransportError> + Send + 'static,
B: AeronSubscriberBackend,
{
let stop_flag = Arc::new(AtomicBool::new(false));
let stop_thread = stop_flag.clone();
let state = Mutex::new(Some((backend, parser)));
let inner = ReceiverStream::new(
move |sender: ChannelSender<AeronItem<T>>, _stop: Arc<AtomicBool>| {
let mut state_guard = state.lock().expect("state lock poisoned");
let (mut backend, mut parser) = state_guard
.take()
.expect("threaded aeron status closure called more than once");
drop(state_guard);
let mut idle_count = 0u32;
let mut last_status = AeronStatus::Disconnected;
loop {
if stop_thread.load(Ordering::Relaxed) {
return Ok(());
}
let count = backend.poll_fragments(&mut |frag| match parser(frag) {
Ok(Some(v)) => {
let _ = sender.send_message(Message::RealtimeValue(AeronItem::Data(v)));
}
Ok(None) => {}
Err(e) => log::warn!(
"aeron sub: parser dropped fragment at position {}: {e}",
frag.position()
),
})?;
let new_status = if backend.is_closed() {
AeronStatus::Closed
} else if backend.is_connected() {
AeronStatus::Connected
} else {
AeronStatus::Disconnected
};
if new_status != last_status {
last_status = new_status;
let _ =
sender.send_message(Message::RealtimeValue(AeronItem::Status(new_status)));
}
if count == 0 {
idle_count = (idle_count + 1).min(20);
let micros = 1u64 << idle_count.min(10);
std::thread::sleep(Duration::from_micros(micros));
} else {
idle_count = 0;
}
}
},
true,
);
ThreadedAeronStatusFragmentNode {
inner,
value: Burst::new(),
status,
stop_flag,
}
.into_stream()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapters::aeron::transport::MockSubscriber;
use crate::{IntoStream, NanoTime, NodeOperators, RunFor, RunMode, StreamOperators};
use std::cell::RefCell;
fn i64_parser_typed(f: &FragmentBuffer<'_>) -> Result<Option<i64>, TransportError> {
Ok(f.as_ref().try_into().ok().map(i64::from_le_bytes))
}
struct ConnectedMockSubscriber {
batches: std::collections::VecDeque<Vec<Vec<u8>>>,
connected: bool,
closed: bool,
}
impl ConnectedMockSubscriber {
fn new(messages: Vec<Vec<u8>>, connected: bool) -> Self {
Self {
batches: std::collections::VecDeque::from(vec![messages]),
connected,
closed: false,
}
}
fn with_batches(batches: Vec<Vec<Vec<u8>>>, connected: bool) -> Self {
Self {
batches: batches.into(),
connected,
closed: false,
}
}
}
impl AeronSubscriberBackend for ConnectedMockSubscriber {
fn poll(&mut self, handler: &mut dyn FnMut(&[u8])) -> anyhow::Result<usize> {
let batch = self.batches.pop_front().unwrap_or_default();
let count = batch.len();
for msg in &batch {
handler(msg);
}
Ok(count)
}
fn is_connected(&self) -> bool {
self.connected
}
fn is_closed(&self) -> bool {
self.closed
}
}
#[test]
fn given_burst_spin_node_when_no_fragments_then_empty_burst_and_returns_false_active() {
let backend = MockSubscriber::new(vec![vec![]]);
let node = AeronSpinSubFragmentNode::new(backend, i64_parser_typed);
let stream = node.into_stream();
stream
.clone()
.as_node()
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Cycles(1))
.unwrap();
assert!(stream.peek_value().is_empty());
}
#[test]
fn given_burst_spin_node_when_single_fragment_then_one_element_burst() {
let msg = 42i64.to_le_bytes().to_vec();
let backend = MockSubscriber::from_messages(vec![msg]);
let node = AeronSpinSubFragmentNode::new(backend, i64_parser_typed);
let stream = node.into_stream();
stream
.clone()
.as_node()
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Cycles(1))
.unwrap();
assert_eq!(stream.peek_value().as_slice(), &[42i64]);
}
#[test]
fn given_burst_spin_node_when_three_fragments_one_poll_then_three_element_burst() {
let batch = vec![
1i64.to_le_bytes().to_vec(),
2i64.to_le_bytes().to_vec(),
3i64.to_le_bytes().to_vec(),
];
let backend = MockSubscriber::new(vec![batch]);
let node = AeronSpinSubFragmentNode::new(backend, i64_parser_typed);
let stream = node.into_stream();
stream
.clone()
.as_node()
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Cycles(1))
.unwrap();
assert_eq!(stream.peek_value().as_slice(), &[1i64, 2, 3]);
}
#[test]
fn given_burst_spin_node_when_parser_returns_none_then_fragment_dropped() {
let batch = vec![
vec![0u8; 4], 42i64.to_le_bytes().to_vec(), ];
let backend = MockSubscriber::new(vec![batch]);
let node = AeronSpinSubFragmentNode::new(backend, i64_parser_typed);
let stream = node.into_stream();
stream
.clone()
.as_node()
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Cycles(1))
.unwrap();
assert_eq!(stream.peek_value().as_slice(), &[42i64]);
}
#[test]
fn given_burst_spin_node_when_parser_returns_err_then_fragment_dropped_and_cycle_continues() {
let batch = vec![
1i64.to_le_bytes().to_vec(),
vec![0xDE, 0xAD, 0xBE, 0xEF, 0xDE, 0xAD], 3i64.to_le_bytes().to_vec(),
];
let backend = MockSubscriber::new(vec![batch]);
let parser = |f: &FragmentBuffer<'_>| -> Result<Option<i64>, TransportError> {
match f.as_ref().len() {
8 => Ok(Some(i64::from_le_bytes(f.as_ref().try_into().unwrap()))),
6 => Err(TransportError::Invalid("bad".into())),
_ => Ok(None),
}
};
let node = AeronSpinSubFragmentNode::new(backend, parser);
let stream = node.into_stream();
stream
.clone()
.as_node()
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Cycles(1))
.unwrap();
assert_eq!(stream.peek_value().as_slice(), &[1i64, 3]);
}
#[test]
fn given_burst_spin_node_when_burst_completes_then_clears_before_next_cycle() {
let backend = MockSubscriber::new(vec![vec![1i64.to_le_bytes().to_vec()], vec![]]);
let node = AeronSpinSubFragmentNode::new(backend, i64_parser_typed);
let stream = node.into_stream();
stream
.clone()
.as_node()
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Cycles(2))
.unwrap();
assert!(stream.peek_value().is_empty());
}
#[test]
fn given_burst_spin_node_when_parser_inspects_fragment_header_then_default_header_is_zero() {
let backend = MockSubscriber::from_messages(vec![7i64.to_le_bytes().to_vec()]);
let parser = |f: &FragmentBuffer<'_>| -> Result<Option<i64>, TransportError> {
assert_eq!(f.position(), 0);
assert_eq!(f.header().session_id, 0);
assert_eq!(f.header().stream_id, 0);
Ok(f.as_ref().try_into().ok().map(i64::from_le_bytes))
};
let node = AeronSpinSubFragmentNode::new(backend, parser);
let stream = node.into_stream();
stream
.clone()
.as_node()
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Cycles(1))
.unwrap();
assert_eq!(stream.peek_value().as_slice(), &[7i64]);
}
fn make_graph_state() -> GraphState {
GraphState::new(
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Cycles(1),
NanoTime::ZERO,
)
}
#[test]
fn given_burst_spin_node_with_status_when_connected_subscriber_polled_then_status_transitions_to_connected()
{
let backend = ConnectedMockSubscriber::new(vec![], true);
let status = Rc::new(RefCell::new(AeronStatusStream::default()));
let mut node =
AeronSpinSubFragmentNode::with_status(backend, i64_parser_typed, Rc::clone(&status));
let mut state = make_graph_state();
node.cycle(&mut state).unwrap();
assert_eq!(status.borrow().current(), AeronStatus::Connected);
assert_eq!(status.borrow().peek_ref().len(), 1);
}
#[test]
fn given_burst_spin_node_with_status_when_disconnected_subscriber_polled_then_status_stays_disconnected()
{
let backend = ConnectedMockSubscriber::new(vec![], false);
let status = Rc::new(RefCell::new(AeronStatusStream::default()));
let mut node =
AeronSpinSubFragmentNode::with_status(backend, i64_parser_typed, Rc::clone(&status));
let mut state = make_graph_state();
node.cycle(&mut state).unwrap();
assert_eq!(status.borrow().current(), AeronStatus::Disconnected);
assert!(status.borrow().peek_ref().is_empty());
}
#[test]
fn given_burst_spin_node_with_status_when_subscriber_closes_then_status_transitions_to_closed()
{
let backend = ConnectedMockSubscriber::with_batches(vec![vec![], vec![]], true);
let status = Rc::new(RefCell::new(AeronStatusStream::default()));
let mut node =
AeronSpinSubFragmentNode::with_status(backend, i64_parser_typed, Rc::clone(&status));
let mut state = make_graph_state();
node.cycle(&mut state).unwrap();
assert_eq!(status.borrow().current(), AeronStatus::Connected);
node.backend.closed = true;
node.cycle(&mut state).unwrap();
assert_eq!(status.borrow().current(), AeronStatus::Closed);
}
#[test]
fn given_burst_spin_node_with_status_when_steady_then_no_re_emission() {
let backend = ConnectedMockSubscriber::with_batches(vec![vec![], vec![]], true);
let status = Rc::new(RefCell::new(AeronStatusStream::default()));
let mut node =
AeronSpinSubFragmentNode::with_status(backend, i64_parser_typed, Rc::clone(&status));
let mut state = make_graph_state();
node.cycle(&mut state).unwrap();
assert_eq!(status.borrow().peek_ref().len(), 1);
node.cycle(&mut state).unwrap();
assert!(
status.borrow().peek_ref().is_empty(),
"steady state must not re-emit the same status"
);
assert_eq!(status.borrow().current(), AeronStatus::Connected);
}
#[test]
fn given_burst_spin_node_with_status_when_burst_clears_then_status_burst_also_clears() {
let backend = ConnectedMockSubscriber::new(vec![], true);
let status = Rc::new(RefCell::new(AeronStatusStream::default()));
let mut node =
AeronSpinSubFragmentNode::with_status(backend, i64_parser_typed, Rc::clone(&status));
let mut state = make_graph_state();
node.cycle(&mut state).unwrap();
assert_eq!(status.borrow().peek_ref().len(), 1);
node.cycle(&mut state).unwrap();
assert!(status.borrow().peek_ref().is_empty());
}
#[test]
fn given_burst_spin_node_new_constructor_when_polled_then_no_status_observed() {
let backend = MockSubscriber::from_messages(vec![]);
let mut node = AeronSpinSubFragmentNode::new(backend, i64_parser_typed);
let mut state = make_graph_state();
node.cycle(&mut state).unwrap();
assert!(node.status.is_none());
}
#[test]
fn given_threaded_status_when_connected_backend_then_connected_transition_propagates() {
let backend = ConnectedMockSubscriber::new(vec![7i64.to_le_bytes().to_vec()], true);
let status = Rc::new(RefCell::new(AeronStatusStream::default()));
let data = build_threaded_with_status(backend, i64_parser_typed, Rc::clone(&status));
let status_stream: Rc<dyn Stream<Burst<AeronStatus>>> = status.clone();
status
.borrow_mut()
.set_producer(Rc::downgrade(&data.clone().as_node()));
let collected = status_stream.collect();
collected
.clone()
.run(
RunMode::RealTime,
RunFor::Duration(std::time::Duration::from_millis(400)),
)
.unwrap();
let statuses: Vec<AeronStatus> = collected
.peek_value()
.into_iter()
.flat_map(|burst| burst.value)
.collect();
assert!(
statuses.contains(&AeronStatus::Connected),
"expected a Connected transition, got {statuses:?}"
);
}
}