Skip to main content

moq_json/
lib.rs

1//! Snapshot/delta JSON publishing over [`moq-net`](moq_net) tracks.
2//!
3//! A JSON value is published over a track as a series of groups, where each group is
4//! self-contained: its first frame is a full snapshot and any following frames are
5//! [RFC 7396](https://www.rfc-editor.org/rfc/rfc7396.html) JSON Merge Patch deltas applied in
6//! order. A consumer jumps to the newest group, reads the snapshot, and applies the deltas, so
7//! a late joiner never needs older groups.
8//!
9//! Deltas are opt-in via [`Config::delta_ratio`]. With deltas disabled (the default)
10//! every change is a fresh snapshot group, matching a plain "one JSON blob per group" track.
11
12mod diff;
13
14use std::marker::PhantomData;
15use std::sync::{Arc, Mutex};
16use std::task::Poll;
17
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use serde_json::Value;
21
22use crate::diff::diff;
23
24/// Maximum frames (snapshot + deltas) in a single group before a new snapshot is forced.
25///
26/// Kept well below moq-net's per-group frame cap so a late joiner can always read the snapshot
27/// at frame 0 before the group is evicted.
28const MAX_DELTA_FRAMES: usize = 256;
29
30/// Errors produced while publishing or consuming JSON.
31#[derive(thiserror::Error, Debug, Clone)]
32#[non_exhaustive]
33pub enum Error {
34	/// An error from the underlying track.
35	#[error(transparent)]
36	Net(#[from] moq_net::Error),
37
38	/// A value failed to serialize, deserialize, or apply as a merge patch.
39	///
40	/// Stored as a string since [`serde_json::Error`] is not [`Clone`].
41	#[error("json: {0}")]
42	Json(String),
43}
44
45impl From<serde_json::Error> for Error {
46	fn from(err: serde_json::Error) -> Self {
47		Error::Json(err.to_string())
48	}
49}
50
51/// A [`Result`](std::result::Result) using this crate's [`Error`].
52pub type Result<T> = std::result::Result<T, Error>;
53
54/// Configuration for a [`Producer`].
55#[derive(Debug, Clone, Default)]
56pub struct Config {
57	/// Controls whether the producer emits deltas (merge patches) instead of full snapshots.
58	///
59	/// `None` disables deltas: every change is published as a new snapshot group.
60	///
61	/// `Some(ratio)` enables deltas. A delta is appended to the current group as long as the
62	/// group's total size stays within `ratio` times the size of a fresh snapshot; otherwise a
63	/// new snapshot group is started. A larger ratio tolerates bigger groups before snapshotting.
64	pub delta_ratio: Option<f64>,
65}
66
67/// Publishes a JSON value over a track, choosing snapshots and deltas automatically.
68///
69/// Cheaply clonable: clones share one underlying track and publishing state, like other MoQ
70/// producers.
71pub struct Producer<T> {
72	inner: Arc<Mutex<Inner>>,
73	_marker: PhantomData<fn(T)>,
74}
75
76impl<T> Clone for Producer<T> {
77	fn clone(&self) -> Self {
78		Self {
79			inner: self.inner.clone(),
80			_marker: PhantomData,
81		}
82	}
83}
84
85impl<T> Producer<T> {
86	/// Create a subscriber for the underlying track.
87	pub fn consume(&self) -> moq_net::TrackConsumer {
88		self.inner.lock().unwrap().track.consume()
89	}
90}
91
92impl<T: Serialize> Producer<T> {
93	/// Create a producer that publishes to the given track.
94	pub fn new(track: moq_net::TrackProducer, config: Config) -> Self {
95		Self {
96			inner: Arc::new(Mutex::new(Inner {
97				track,
98				group: None,
99				last: None,
100				group_bytes: 0,
101				group_frames: 0,
102				config,
103			})),
104			_marker: PhantomData,
105		}
106	}
107
108	/// Publish a new value, emitting a snapshot or a delta automatically.
109	///
110	/// Does nothing if the value is unchanged from the previous publish.
111	pub fn update(&mut self, value: &T) -> Result<()> {
112		let json = serde_json::to_value(value)?;
113		// Serialize the value directly (not via `json`) so a snapshot preserves the type's own
114		// field order, keeping the wire bytes identical to serializing `T` straight to a frame.
115		let snapshot = serde_json::to_vec(value)?;
116		self.inner.lock().unwrap().update(json, snapshot)
117	}
118
119	/// Finish the track, closing any open group.
120	pub fn finish(&mut self) -> Result<()> {
121		self.inner.lock().unwrap().finish()
122	}
123}
124
125/// Shared publishing state behind [`Producer`]'s `Arc<Mutex>`.
126struct Inner {
127	track: moq_net::TrackProducer,
128	group: Option<moq_net::GroupProducer>,
129	last: Option<Value>,
130	group_bytes: u64,
131	group_frames: usize,
132	config: Config,
133}
134
135impl Inner {
136	fn update(&mut self, json: Value, snapshot: Vec<u8>) -> Result<()> {
137		if self.last.as_ref() == Some(&json) {
138			return Ok(());
139		}
140
141		match self.delta(&json, snapshot.len())? {
142			Some(delta) => {
143				let group = self.group.as_mut().expect("delta requires an open group");
144				let len = delta.len() as u64;
145				group.write_frame(delta)?;
146				self.group_bytes += len;
147				self.group_frames += 1;
148			}
149			None => self.snapshot(snapshot)?,
150		}
151
152		self.last = Some(json);
153		Ok(())
154	}
155
156	/// Serialize a delta if deltas are enabled and appending one keeps the group within budget;
157	/// otherwise `None`, signalling that a fresh snapshot should be published instead.
158	fn delta(&self, value: &Value, snapshot_len: usize) -> Result<Option<Vec<u8>>> {
159		let Some(ratio) = self.config.delta_ratio else {
160			return Ok(None);
161		};
162		let Some(last) = &self.last else {
163			return Ok(None);
164		};
165		if self.group.is_none() || self.group_frames >= MAX_DELTA_FRAMES {
166			return Ok(None);
167		}
168
169		let diff = diff(last, value);
170		if diff.forced_snapshot {
171			return Ok(None);
172		}
173
174		let delta = serde_json::to_vec(&diff.patch)?;
175
176		// Roll a snapshot if appending the delta would bloat the group past the budget.
177		let projected = (self.group_bytes + delta.len() as u64) as f64;
178		if projected > ratio * snapshot_len as f64 {
179			return Ok(None);
180		}
181
182		Ok(Some(delta))
183	}
184
185	/// Start a new group with a full snapshot as its first frame.
186	fn snapshot(&mut self, snapshot: Vec<u8>) -> Result<()> {
187		// The previous group is complete; no more frames will be appended to it.
188		if let Some(mut group) = self.group.take() {
189			group.finish()?;
190		}
191
192		let len = snapshot.len() as u64;
193		let mut group = self.track.append_group()?;
194		group.write_frame(snapshot)?;
195		self.group_bytes = len;
196		self.group_frames = 1;
197
198		if self.config.delta_ratio.is_some() {
199			// Keep the group open so future deltas can be appended.
200			self.group = Some(group);
201		} else {
202			// Deltas disabled: one frame per group, identical to a plain JSON track.
203			group.finish()?;
204		}
205
206		Ok(())
207	}
208
209	fn finish(&mut self) -> Result<()> {
210		if let Some(mut group) = self.group.take() {
211			group.finish()?;
212		}
213		self.track.finish()?;
214		Ok(())
215	}
216}
217
218/// Consumes a JSON value from a track, reconstructing it from snapshots and deltas.
219pub struct Consumer<T> {
220	track: moq_net::TrackConsumer,
221	group: Option<moq_net::GroupConsumer>,
222	current: Option<Value>,
223	frames_read: usize,
224	_marker: PhantomData<fn() -> T>,
225}
226
227impl<T: DeserializeOwned> Consumer<T> {
228	/// Create a consumer reading from the given track subscriber.
229	pub fn new(track: moq_net::TrackConsumer) -> Self {
230		Self {
231			track,
232			group: None,
233			current: None,
234			frames_read: 0,
235			_marker: PhantomData,
236		}
237	}
238
239	/// Get the next reconstructed value, or `None` once the track ends.
240	pub async fn next(&mut self) -> Result<Option<T>>
241	where
242		T: Unpin,
243	{
244		kio::wait(|waiter| self.poll_next(waiter)).await
245	}
246
247	/// Poll for the next reconstructed value, without blocking.
248	///
249	/// Jumps to the newest group, reads its snapshot, and applies deltas in order, yielding the
250	/// reconstructed value after each frame. Switching to a newer group discards the older one.
251	pub fn poll_next(&mut self, waiter: &kio::Waiter) -> Poll<Result<Option<T>>> {
252		// Drain to the newest group, resetting reconstruction state whenever we switch.
253		let track_finished = loop {
254			match self.track.poll_next_group(waiter)? {
255				Poll::Ready(Some(group)) => {
256					self.group = Some(group);
257					self.current = None;
258					self.frames_read = 0;
259				}
260				Poll::Ready(None) => break true,
261				Poll::Pending => break false,
262			}
263		};
264
265		if let Some(group) = &mut self.group {
266			match group.poll_read_frame(waiter)? {
267				Poll::Ready(Some(frame)) => return Poll::Ready(Ok(Some(self.apply(frame)?))),
268				// The current group is exhausted; wait for a newer one.
269				Poll::Ready(None) => self.group = None,
270				Poll::Pending => return Poll::Pending,
271			}
272		}
273
274		if track_finished {
275			Poll::Ready(Ok(None))
276		} else {
277			Poll::Pending
278		}
279	}
280
281	/// Apply one frame: frame 0 of a group is a snapshot, the rest are merge patches.
282	fn apply(&mut self, frame: bytes::Bytes) -> Result<T> {
283		if self.frames_read == 0 {
284			self.current = Some(serde_json::from_slice(&frame)?);
285		} else {
286			let patch: Value = serde_json::from_slice(&frame)?;
287			let current = self.current.as_mut().expect("a snapshot precedes any delta");
288			json_patch::merge(current, &patch);
289		}
290		self.frames_read += 1;
291
292		let current = self
293			.current
294			.as_ref()
295			.expect("a value is present after applying a frame");
296		Ok(serde_json::from_value(current.clone())?)
297	}
298}
299
300#[cfg(test)]
301mod test {
302	use super::*;
303	use serde_json::json;
304
305	fn producer(config: Config) -> (Producer<Value>, moq_net::TrackConsumer) {
306		let track = moq_net::Track::new("test").produce();
307		let consumer = track.consume();
308		(Producer::new(track, config), consumer)
309	}
310
311	/// Drain every value currently available from a consumer without blocking.
312	fn drain(track: moq_net::TrackConsumer) -> Vec<Value> {
313		let mut consumer = Consumer::<Value>::new(track);
314		let waiter = kio::Waiter::noop();
315		let mut out = Vec::new();
316		while let Poll::Ready(Ok(Some(value))) = consumer.poll_next(&waiter) {
317			out.push(value);
318		}
319		out
320	}
321
322	#[test]
323	fn deltas_off_snapshot_per_group() {
324		let (mut producer, track) = producer(Config::default());
325		producer.update(&json!({ "a": 1 })).unwrap();
326		producer.update(&json!({ "a": 2 })).unwrap();
327		producer.finish().unwrap();
328
329		// Two updates => two groups, each a full snapshot. A consumer that joins after both
330		// exist only sees the latest, like the existing catalog consumer.
331		assert_eq!(track.latest(), Some(1));
332		assert_eq!(drain(track), vec![json!({ "a": 2 })]);
333	}
334
335	#[test]
336	fn live_consumer_sees_each_update() {
337		let (mut producer, track) = producer(Config::default());
338		let mut consumer = Consumer::<Value>::new(track);
339		let waiter = kio::Waiter::noop();
340
341		for n in 1..=3 {
342			producer.update(&json!({ "a": n })).unwrap();
343			match consumer.poll_next(&waiter) {
344				Poll::Ready(Ok(Some(value))) => assert_eq!(value, json!({ "a": n })),
345				other => panic!("expected value, got {other:?}"),
346			}
347		}
348	}
349
350	#[test]
351	fn unchanged_value_writes_nothing() {
352		let (mut producer, track) = producer(Config::default());
353		producer.update(&json!({ "a": 1 })).unwrap();
354		producer.update(&json!({ "a": 1 })).unwrap();
355		producer.finish().unwrap();
356
357		assert_eq!(track.latest(), Some(0));
358		assert_eq!(drain(track), vec![json!({ "a": 1 })]);
359	}
360
361	#[test]
362	fn deltas_share_one_group() {
363		let config = Config {
364			delta_ratio: Some(100.0),
365		};
366		let (mut producer, track) = producer(config);
367		producer.update(&json!({ "a": 1, "b": 1 })).unwrap();
368		producer.update(&json!({ "a": 1, "b": 2 })).unwrap();
369		producer.update(&json!({ "a": 1, "b": 3 })).unwrap();
370		producer.finish().unwrap();
371
372		// All updates fit in a single group as snapshot + deltas.
373		assert_eq!(track.latest(), Some(0));
374		let values = drain(track);
375		assert_eq!(values.last().unwrap(), &json!({ "a": 1, "b": 3 }));
376	}
377
378	#[test]
379	fn tight_ratio_rolls_snapshots() {
380		// A ratio of 1.0 leaves no room for any delta past the snapshot, so every change rolls.
381		let config = Config { delta_ratio: Some(1.0) };
382		let (mut producer, track) = producer(config);
383		producer.update(&json!({ "a": 1 })).unwrap();
384		producer.update(&json!({ "a": 2 })).unwrap();
385		producer.update(&json!({ "a": 3 })).unwrap();
386		producer.finish().unwrap();
387
388		assert_eq!(track.latest(), Some(2));
389	}
390
391	#[test]
392	fn array_change_is_delta() {
393		let config = Config {
394			delta_ratio: Some(100.0),
395		};
396		let (mut producer, track) = producer(config);
397		producer.update(&json!({ "list": [1, 2] })).unwrap();
398		producer.update(&json!({ "list": [1, 2, 3] })).unwrap();
399		producer.finish().unwrap();
400
401		// The array is replaced wholesale in a delta, so it stays in the same group.
402		assert_eq!(track.latest(), Some(0));
403		assert_eq!(drain(track).last().unwrap(), &json!({ "list": [1, 2, 3] }));
404	}
405
406	#[test]
407	fn frame_cap_rolls_snapshot() {
408		let config = Config {
409			delta_ratio: Some(1_000_000.0),
410		};
411		let (mut producer, track) = producer(config);
412		// First update is the snapshot (frame 0); then MAX_DELTA_FRAMES - 1 deltas fill the group.
413		for i in 0..=MAX_DELTA_FRAMES {
414			producer.update(&json!({ "n": i })).unwrap();
415		}
416		producer.finish().unwrap();
417
418		// The frame cap forced exactly one extra snapshot group despite the huge ratio.
419		assert_eq!(track.latest(), Some(1));
420		assert_eq!(drain(track).last().unwrap(), &json!({ "n": MAX_DELTA_FRAMES }));
421	}
422
423	#[test]
424	fn late_joiner_reconstructs_from_deltas() {
425		let config = Config {
426			delta_ratio: Some(100.0),
427		};
428		let (mut producer, track) = producer(config);
429		producer.update(&json!({ "a": 1, "b": 1 })).unwrap();
430		producer.update(&json!({ "a": 1, "b": 2 })).unwrap();
431		producer.update(&json!({ "a": 5, "b": 2 })).unwrap();
432		producer.finish().unwrap();
433
434		// A consumer created only now still rebuilds the final value from snapshot + deltas.
435		assert_eq!(drain(track).last().unwrap(), &json!({ "a": 5, "b": 2 }));
436	}
437
438	#[test]
439	fn newer_group_supersedes_in_progress_reconstruction() {
440		// A tight ratio lets one delta fit, then forces the next update into a new snapshot group.
441		let config = Config { delta_ratio: Some(2.0) };
442		let (mut producer, track) = producer(config);
443		let observer = producer.consume();
444		let mut consumer = Consumer::<Value>::new(track);
445		let waiter = kio::Waiter::noop();
446
447		producer.update(&json!({ "a": 1 })).unwrap(); // snapshot, group 0
448		match consumer.poll_next(&waiter) {
449			Poll::Ready(Ok(Some(value))) => assert_eq!(value, json!({ "a": 1 })),
450			other => panic!("expected first value, got {other:?}"),
451		}
452
453		producer.update(&json!({ "a": 2 })).unwrap(); // delta in group 0
454		producer.update(&json!({ "a": 3 })).unwrap(); // exceeds budget, rolls group 1
455		producer.finish().unwrap();
456		assert_eq!(observer.latest(), Some(1));
457
458		// The consumer jumps to the newest group and never yields a stale value.
459		let mut last = None;
460		while let Poll::Ready(Ok(Some(value))) = consumer.poll_next(&waiter) {
461			last = Some(value);
462		}
463		assert_eq!(last.unwrap(), json!({ "a": 3 }));
464	}
465}