mod diff;
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
use std::task::Poll;
use serde::Serialize;
use serde::de::DeserializeOwned;
use serde_json::Value;
use crate::diff::diff;
const MAX_DELTA_FRAMES: usize = 256;
#[derive(thiserror::Error, Debug, Clone)]
#[non_exhaustive]
pub enum Error {
#[error(transparent)]
Net(#[from] moq_net::Error),
#[error("json: {0}")]
Json(String),
}
impl From<serde_json::Error> for Error {
fn from(err: serde_json::Error) -> Self {
Error::Json(err.to_string())
}
}
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Clone, Default)]
pub struct Config {
pub delta_ratio: Option<f64>,
}
pub struct Producer<T> {
inner: Arc<Mutex<Inner>>,
_marker: PhantomData<fn(T)>,
}
impl<T> Clone for Producer<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
_marker: PhantomData,
}
}
}
impl<T> Producer<T> {
pub fn consume(&self) -> moq_net::TrackConsumer {
self.inner.lock().unwrap().track.consume()
}
}
impl<T: Serialize> Producer<T> {
pub fn new(track: moq_net::TrackProducer, config: Config) -> Self {
Self {
inner: Arc::new(Mutex::new(Inner {
track,
group: None,
last: None,
group_bytes: 0,
group_frames: 0,
config,
})),
_marker: PhantomData,
}
}
pub fn update(&mut self, value: &T) -> Result<()> {
let json = serde_json::to_value(value)?;
let snapshot = serde_json::to_vec(value)?;
self.inner.lock().unwrap().update(json, snapshot)
}
pub fn finish(&mut self) -> Result<()> {
self.inner.lock().unwrap().finish()
}
}
struct Inner {
track: moq_net::TrackProducer,
group: Option<moq_net::GroupProducer>,
last: Option<Value>,
group_bytes: u64,
group_frames: usize,
config: Config,
}
impl Inner {
fn update(&mut self, json: Value, snapshot: Vec<u8>) -> Result<()> {
if self.last.as_ref() == Some(&json) {
return Ok(());
}
match self.delta(&json, snapshot.len())? {
Some(delta) => {
let group = self.group.as_mut().expect("delta requires an open group");
let len = delta.len() as u64;
group.write_frame(delta)?;
self.group_bytes += len;
self.group_frames += 1;
}
None => self.snapshot(snapshot)?,
}
self.last = Some(json);
Ok(())
}
fn delta(&self, value: &Value, snapshot_len: usize) -> Result<Option<Vec<u8>>> {
let Some(ratio) = self.config.delta_ratio else {
return Ok(None);
};
let Some(last) = &self.last else {
return Ok(None);
};
if self.group.is_none() || self.group_frames >= MAX_DELTA_FRAMES {
return Ok(None);
}
let diff = diff(last, value);
if diff.forced_snapshot {
return Ok(None);
}
let delta = serde_json::to_vec(&diff.patch)?;
let projected = (self.group_bytes + delta.len() as u64) as f64;
if projected > ratio * snapshot_len as f64 {
return Ok(None);
}
Ok(Some(delta))
}
fn snapshot(&mut self, snapshot: Vec<u8>) -> Result<()> {
if let Some(mut group) = self.group.take() {
group.finish()?;
}
let len = snapshot.len() as u64;
let mut group = self.track.append_group()?;
group.write_frame(snapshot)?;
self.group_bytes = len;
self.group_frames = 1;
if self.config.delta_ratio.is_some() {
self.group = Some(group);
} else {
group.finish()?;
}
Ok(())
}
fn finish(&mut self) -> Result<()> {
if let Some(mut group) = self.group.take() {
group.finish()?;
}
self.track.finish()?;
Ok(())
}
}
pub struct Consumer<T> {
track: moq_net::TrackConsumer,
group: Option<moq_net::GroupConsumer>,
current: Option<Value>,
frames_read: usize,
_marker: PhantomData<fn() -> T>,
}
impl<T: DeserializeOwned> Consumer<T> {
pub fn new(track: moq_net::TrackConsumer) -> Self {
Self {
track,
group: None,
current: None,
frames_read: 0,
_marker: PhantomData,
}
}
pub async fn next(&mut self) -> Result<Option<T>>
where
T: Unpin,
{
kio::wait(|waiter| self.poll_next(waiter)).await
}
pub fn poll_next(&mut self, waiter: &kio::Waiter) -> Poll<Result<Option<T>>> {
let track_finished = loop {
match self.track.poll_next_group(waiter)? {
Poll::Ready(Some(group)) => {
self.group = Some(group);
self.current = None;
self.frames_read = 0;
}
Poll::Ready(None) => break true,
Poll::Pending => break false,
}
};
if let Some(group) = &mut self.group {
match group.poll_read_frame(waiter)? {
Poll::Ready(Some(frame)) => return Poll::Ready(Ok(Some(self.apply(frame)?))),
Poll::Ready(None) => self.group = None,
Poll::Pending => return Poll::Pending,
}
}
if track_finished {
Poll::Ready(Ok(None))
} else {
Poll::Pending
}
}
fn apply(&mut self, frame: bytes::Bytes) -> Result<T> {
if self.frames_read == 0 {
self.current = Some(serde_json::from_slice(&frame)?);
} else {
let patch: Value = serde_json::from_slice(&frame)?;
let current = self.current.as_mut().expect("a snapshot precedes any delta");
json_patch::merge(current, &patch);
}
self.frames_read += 1;
let current = self
.current
.as_ref()
.expect("a value is present after applying a frame");
Ok(serde_json::from_value(current.clone())?)
}
}
#[cfg(test)]
mod test {
use super::*;
use serde_json::json;
fn producer(config: Config) -> (Producer<Value>, moq_net::TrackConsumer) {
let track = moq_net::Track::new("test").produce();
let consumer = track.consume();
(Producer::new(track, config), consumer)
}
fn drain(track: moq_net::TrackConsumer) -> Vec<Value> {
let mut consumer = Consumer::<Value>::new(track);
let waiter = kio::Waiter::noop();
let mut out = Vec::new();
while let Poll::Ready(Ok(Some(value))) = consumer.poll_next(&waiter) {
out.push(value);
}
out
}
#[test]
fn deltas_off_snapshot_per_group() {
let (mut producer, track) = producer(Config::default());
producer.update(&json!({ "a": 1 })).unwrap();
producer.update(&json!({ "a": 2 })).unwrap();
producer.finish().unwrap();
assert_eq!(track.latest(), Some(1));
assert_eq!(drain(track), vec![json!({ "a": 2 })]);
}
#[test]
fn live_consumer_sees_each_update() {
let (mut producer, track) = producer(Config::default());
let mut consumer = Consumer::<Value>::new(track);
let waiter = kio::Waiter::noop();
for n in 1..=3 {
producer.update(&json!({ "a": n })).unwrap();
match consumer.poll_next(&waiter) {
Poll::Ready(Ok(Some(value))) => assert_eq!(value, json!({ "a": n })),
other => panic!("expected value, got {other:?}"),
}
}
}
#[test]
fn unchanged_value_writes_nothing() {
let (mut producer, track) = producer(Config::default());
producer.update(&json!({ "a": 1 })).unwrap();
producer.update(&json!({ "a": 1 })).unwrap();
producer.finish().unwrap();
assert_eq!(track.latest(), Some(0));
assert_eq!(drain(track), vec![json!({ "a": 1 })]);
}
#[test]
fn deltas_share_one_group() {
let config = Config {
delta_ratio: Some(100.0),
};
let (mut producer, track) = producer(config);
producer.update(&json!({ "a": 1, "b": 1 })).unwrap();
producer.update(&json!({ "a": 1, "b": 2 })).unwrap();
producer.update(&json!({ "a": 1, "b": 3 })).unwrap();
producer.finish().unwrap();
assert_eq!(track.latest(), Some(0));
let values = drain(track);
assert_eq!(values.last().unwrap(), &json!({ "a": 1, "b": 3 }));
}
#[test]
fn tight_ratio_rolls_snapshots() {
let config = Config { delta_ratio: Some(1.0) };
let (mut producer, track) = producer(config);
producer.update(&json!({ "a": 1 })).unwrap();
producer.update(&json!({ "a": 2 })).unwrap();
producer.update(&json!({ "a": 3 })).unwrap();
producer.finish().unwrap();
assert_eq!(track.latest(), Some(2));
}
#[test]
fn array_change_is_delta() {
let config = Config {
delta_ratio: Some(100.0),
};
let (mut producer, track) = producer(config);
producer.update(&json!({ "list": [1, 2] })).unwrap();
producer.update(&json!({ "list": [1, 2, 3] })).unwrap();
producer.finish().unwrap();
assert_eq!(track.latest(), Some(0));
assert_eq!(drain(track).last().unwrap(), &json!({ "list": [1, 2, 3] }));
}
#[test]
fn frame_cap_rolls_snapshot() {
let config = Config {
delta_ratio: Some(1_000_000.0),
};
let (mut producer, track) = producer(config);
for i in 0..=MAX_DELTA_FRAMES {
producer.update(&json!({ "n": i })).unwrap();
}
producer.finish().unwrap();
assert_eq!(track.latest(), Some(1));
assert_eq!(drain(track).last().unwrap(), &json!({ "n": MAX_DELTA_FRAMES }));
}
#[test]
fn late_joiner_reconstructs_from_deltas() {
let config = Config {
delta_ratio: Some(100.0),
};
let (mut producer, track) = producer(config);
producer.update(&json!({ "a": 1, "b": 1 })).unwrap();
producer.update(&json!({ "a": 1, "b": 2 })).unwrap();
producer.update(&json!({ "a": 5, "b": 2 })).unwrap();
producer.finish().unwrap();
assert_eq!(drain(track).last().unwrap(), &json!({ "a": 5, "b": 2 }));
}
#[test]
fn newer_group_supersedes_in_progress_reconstruction() {
let config = Config { delta_ratio: Some(2.0) };
let (mut producer, track) = producer(config);
let observer = producer.consume();
let mut consumer = Consumer::<Value>::new(track);
let waiter = kio::Waiter::noop();
producer.update(&json!({ "a": 1 })).unwrap(); match consumer.poll_next(&waiter) {
Poll::Ready(Ok(Some(value))) => assert_eq!(value, json!({ "a": 1 })),
other => panic!("expected first value, got {other:?}"),
}
producer.update(&json!({ "a": 2 })).unwrap(); producer.update(&json!({ "a": 3 })).unwrap(); producer.finish().unwrap();
assert_eq!(observer.latest(), Some(1));
let mut last = None;
while let Poll::Ready(Ok(Some(value))) = consumer.poll_next(&waiter) {
last = Some(value);
}
assert_eq!(last.unwrap(), json!({ "a": 3 }));
}
}