use crate::{
channels::{FlushResult, SharedBackStage, TxConnectable, TxSendError},
core::{AppMonotonicClock, Clock, PubtimeMarker},
prelude::{Acqtime, DoubleBufferTx, Message, OverflowPolicy, Stamp, Tx},
};
pub struct MessageTx<T> {
imp: DoubleBufferTx<Message<T>>,
pub_clock: AppMonotonicClock<PubtimeMarker>,
seq: u64,
}
impl<T> MessageTx<T> {
pub fn new(capacity: usize) -> Self {
Self::new_imp(DoubleBufferTx::new(capacity))
}
pub fn new_auto_size() -> Self {
Self::new_imp(DoubleBufferTx::new_auto_size())
}
fn new_imp(imp: DoubleBufferTx<Message<T>>) -> Self {
Self {
imp,
pub_clock: AppMonotonicClock::new(),
seq: 0,
}
}
pub fn push(&mut self, acqtime: Acqtime, value: T) -> Result<(), TxSendError> {
let seq = self.pub_seq();
let pubtime = self.pub_clock.now();
self.imp.push(Message {
seq,
stamp: Stamp { acqtime, pubtime },
value,
})
}
pub fn push_many<I: IntoIterator<Item = (Acqtime, T)>>(
&mut self,
values: I,
) -> Result<(), TxSendError> {
for (acqtime, value) in values.into_iter() {
self.push(acqtime, value)?;
}
Ok(())
}
fn pub_seq(&mut self) -> u64 {
let seq = self.seq;
self.seq = self.seq.wrapping_add(1);
seq
}
}
impl<T: Send + Sync + Clone> Tx for MessageTx<T> {
fn flush(&mut self) -> FlushResult {
self.imp.flush()
}
fn is_connected(&self) -> bool {
self.imp.is_connected()
}
fn len(&self) -> usize {
self.imp.len()
}
}
impl<V: Send + Sync + Clone> TxConnectable for MessageTx<V> {
type Message = Message<V>;
fn has_max_connection_count(&self) -> bool {
self.imp.has_max_connection_count()
}
fn overflow_policy(&self) -> OverflowPolicy {
self.imp.overflow_policy()
}
fn on_connect(&mut self, stage: SharedBackStage<Self::Message>) {
self.imp.on_connect(stage);
}
}