use std::future::Future;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use serde::Serialize;
use crate::rhythm::Rhythm;
use super::frame::FrameRecord;
use super::rlog::FrameWriter;
pub struct RecordingRhythm<R> {
inner: R,
rhythm_id: u16,
writer: Arc<FrameWriter>,
seq: Arc<AtomicU64>,
}
impl<R> RecordingRhythm<R> {
pub fn new(inner: R, rhythm_id: u16, writer: Arc<FrameWriter>, seq: Arc<AtomicU64>) -> Self {
Self { inner, rhythm_id, writer, seq }
}
pub fn inner_mut(&mut self) -> &mut R {
&mut self.inner
}
}
impl<R> Rhythm for RecordingRhythm<R>
where
R: Rhythm + Send,
R::Yield: Serialize + Clone + Send,
R::Feed: Serialize + Send,
R::Output: Send,
R::Error: Send,
{
type Yield = R::Yield;
type Feed = R::Feed;
type Output = R::Output;
type Error = R::Error;
fn drive<N, F, Fut>(
&mut self,
nodes: N,
mut op_domain: F,
) -> impl Future<Output = (Self::Output, N)> + Send
where
N: Send,
F: FnMut(N, Self::Yield) -> Fut + Send,
Fut: Future<Output = (Self::Feed, N)> + Send,
{
let writer = self.writer.clone();
let seq = self.seq.clone();
let rid = self.rhythm_id;
let wrapped = move |nodes: N, yield_val: Self::Yield| {
let yield_bytes =
bincode::serialize(&yield_val).expect("RecordingRhythm: yield serialize failed");
let t = writer.elapsed_ns();
let fut = op_domain(nodes, yield_val);
let writer = writer.clone();
let seq = seq.clone();
async move {
let (feed_val, returned_nodes) = fut.await;
let feed_bytes =
bincode::serialize(&feed_val).expect("RecordingRhythm: feed serialize failed");
writer.write_frame(&FrameRecord {
global_seq: seq.fetch_add(1, Ordering::SeqCst),
rhythm_id: rid,
timestamp_ns: t,
yield_data: yield_bytes,
feed_data: feed_bytes,
});
(feed_val, returned_nodes)
}
};
self.inner.drive(nodes, wrapped)
}
}