Skip to main content

moq_json/
lib.rs

1//! Snapshot/delta JSON publishing over [`moq-net`](moq_net) tracks.
2//!
3//! A JSON value is published over a track as a series of groups, where each group is
4//! self-contained: its first frame is a full snapshot and any following frames are
5//! [RFC 7396](https://www.rfc-editor.org/rfc/rfc7396.html) JSON Merge Patch deltas applied in
6//! order. A consumer jumps to the newest group, reads the snapshot, and applies the deltas, so
7//! a late joiner never needs older groups.
8//!
9//! Deltas are opt-in via [`Config::delta_ratio`]. With deltas disabled (the default)
10//! every change is a fresh snapshot group, matching a plain "one JSON blob per group" track.
11
12mod diff;
13
14use std::marker::PhantomData;
15use std::ops::{Deref, DerefMut};
16use std::sync::{Arc, Mutex, MutexGuard};
17use std::task::Poll;
18
19use serde::Serialize;
20use serde::de::DeserializeOwned;
21use serde_json::Value;
22
23use crate::diff::diff;
24
25/// Maximum frames (snapshot + deltas) in a single group before a new snapshot is forced.
26///
27/// Kept well below moq-net's per-group frame cap so a late joiner can always read the snapshot
28/// at frame 0 before the group is evicted.
29const MAX_DELTA_FRAMES: usize = 256;
30
31/// Errors produced while publishing or consuming JSON.
32#[derive(thiserror::Error, Debug, Clone)]
33#[non_exhaustive]
34pub enum Error {
35	/// An error from the underlying track.
36	#[error(transparent)]
37	Net(#[from] moq_net::Error),
38
39	/// A value failed to serialize, deserialize, or apply as a merge patch.
40	///
41	/// Stored as a string since [`serde_json::Error`] is not [`Clone`].
42	#[error("json: {0}")]
43	Json(String),
44}
45
46impl From<serde_json::Error> for Error {
47	fn from(err: serde_json::Error) -> Self {
48		Error::Json(err.to_string())
49	}
50}
51
52/// A [`Result`](std::result::Result) using this crate's [`Error`].
53pub type Result<T> = std::result::Result<T, Error>;
54
55/// Configuration for a [`Producer`].
56#[derive(Debug, Clone, Default)]
57pub struct Config {
58	/// Controls whether the producer emits deltas (merge patches) instead of full snapshots.
59	///
60	/// `None` disables deltas: every change is published as a new snapshot group.
61	///
62	/// `Some(ratio)` enables deltas. A delta is appended to the current group as long as the
63	/// group's total size stays within `ratio` times the size of a fresh snapshot; otherwise a
64	/// new snapshot group is started. A larger ratio tolerates bigger groups before snapshotting.
65	pub delta_ratio: Option<f64>,
66}
67
68/// Publishes a JSON value over a track, choosing snapshots and deltas automatically.
69///
70/// Cheaply clonable: clones share one underlying track and publishing state, like other MoQ
71/// producers.
72pub struct Producer<T> {
73	inner: Arc<Mutex<Inner>>,
74	_marker: PhantomData<fn(T)>,
75}
76
77impl<T> Clone for Producer<T> {
78	fn clone(&self) -> Self {
79		Self {
80			inner: self.inner.clone(),
81			_marker: PhantomData,
82		}
83	}
84}
85
86impl<T> Producer<T> {
87	/// Create a subscriber for the underlying track.
88	pub fn consume(&self) -> moq_net::TrackConsumer {
89		self.inner.lock().unwrap().track.consume()
90	}
91}
92
93impl<T: Serialize> Producer<T> {
94	/// Create a producer that publishes to the given track.
95	pub fn new(track: moq_net::TrackProducer, config: Config) -> Self {
96		Self {
97			inner: Arc::new(Mutex::new(Inner {
98				track,
99				group: None,
100				last: None,
101				group_bytes: 0,
102				group_frames: 0,
103				config,
104			})),
105			_marker: PhantomData,
106		}
107	}
108
109	/// Publish a new value, emitting a snapshot or a delta automatically.
110	///
111	/// Does nothing if the value is unchanged from the previous publish.
112	pub fn update(&mut self, value: &T) -> Result<()> {
113		let json = serde_json::to_value(value)?;
114		// Serialize the value directly (not via `json`) so a snapshot preserves the type's own
115		// field order, keeping the wire bytes identical to serializing `T` straight to a frame.
116		let snapshot = serde_json::to_vec(value)?;
117		self.inner.lock().unwrap().update(json, snapshot)
118	}
119
120	/// Lock the current value for in-place editing, publishing on drop.
121	///
122	/// The returned [`Guard`] derefs to the last-published value (or `T::default()` if nothing has
123	/// been published yet). Editing it through [`DerefMut`] marks the guard dirty; when a dirty
124	/// guard drops it publishes the result, a no-op if unchanged.
125	///
126	/// This is the counterpart to a callback: hold the guard, mutate, drop. The guard holds the
127	/// producer's lock for its lifetime, so independent owners are serialized: each one starts from
128	/// the latest value and their changes compose instead of clobbering. Don't hold a guard across
129	/// an `.await`, since that keeps the lock held while suspended.
130	pub fn lock(&mut self) -> Guard<'_, T>
131	where
132		T: Default + DeserializeOwned,
133	{
134		let inner = self.inner.lock().unwrap();
135		let value = inner
136			.last
137			.as_ref()
138			.and_then(|last| serde_json::from_value(last.clone()).ok())
139			.unwrap_or_default();
140
141		Guard {
142			inner,
143			value,
144			dirty: false,
145		}
146	}
147
148	/// Finish the track, closing any open group.
149	pub fn finish(&mut self) -> Result<()> {
150		self.inner.lock().unwrap().finish()
151	}
152}
153
154/// An RAII editing guard returned by [`Producer::lock`].
155///
156/// Holds the producer's lock for its lifetime and derefs to the current value. Mutating it through
157/// [`DerefMut`] marks it dirty, and dropping a dirty guard publishes the edited value.
158pub struct Guard<'a, T: Serialize> {
159	inner: MutexGuard<'a, Inner>,
160	value: T,
161	dirty: bool,
162}
163
164impl<T: Serialize> Deref for Guard<'_, T> {
165	type Target = T;
166
167	fn deref(&self) -> &T {
168		&self.value
169	}
170}
171
172impl<T: Serialize> DerefMut for Guard<'_, T> {
173	fn deref_mut(&mut self) -> &mut T {
174		self.dirty = true;
175		&mut self.value
176	}
177}
178
179impl<T: Serialize> Drop for Guard<'_, T> {
180	fn drop(&mut self) {
181		if !self.dirty {
182			return;
183		}
184
185		let Ok(json) = serde_json::to_value(&self.value) else {
186			return;
187		};
188		let Ok(snapshot) = serde_json::to_vec(&self.value) else {
189			return;
190		};
191
192		// We already hold the lock, so publish through the held guard rather than re-locking.
193		let _ = self.inner.update(json, snapshot);
194	}
195}
196
197/// Shared publishing state behind [`Producer`]'s `Arc<Mutex>`.
198struct Inner {
199	track: moq_net::TrackProducer,
200	group: Option<moq_net::GroupProducer>,
201	last: Option<Value>,
202	group_bytes: u64,
203	group_frames: usize,
204	config: Config,
205}
206
207impl Inner {
208	fn update(&mut self, json: Value, snapshot: Vec<u8>) -> Result<()> {
209		if self.last.as_ref() == Some(&json) {
210			return Ok(());
211		}
212
213		match self.delta(&json, snapshot.len())? {
214			Some(delta) => {
215				let group = self.group.as_mut().expect("delta requires an open group");
216				let len = delta.len() as u64;
217				group.write_frame(delta)?;
218				self.group_bytes += len;
219				self.group_frames += 1;
220			}
221			None => self.snapshot(snapshot)?,
222		}
223
224		self.last = Some(json);
225		Ok(())
226	}
227
228	/// Serialize a delta if deltas are enabled and appending one keeps the group within budget;
229	/// otherwise `None`, signalling that a fresh snapshot should be published instead.
230	fn delta(&self, value: &Value, snapshot_len: usize) -> Result<Option<Vec<u8>>> {
231		let Some(ratio) = self.config.delta_ratio else {
232			return Ok(None);
233		};
234		let Some(last) = &self.last else {
235			return Ok(None);
236		};
237		if self.group.is_none() || self.group_frames >= MAX_DELTA_FRAMES {
238			return Ok(None);
239		}
240
241		let diff = diff(last, value);
242		if diff.forced_snapshot {
243			return Ok(None);
244		}
245
246		let delta = serde_json::to_vec(&diff.patch)?;
247
248		// Roll a snapshot if appending the delta would bloat the group past the budget.
249		let projected = (self.group_bytes + delta.len() as u64) as f64;
250		if projected > ratio * snapshot_len as f64 {
251			return Ok(None);
252		}
253
254		Ok(Some(delta))
255	}
256
257	/// Start a new group with a full snapshot as its first frame.
258	fn snapshot(&mut self, snapshot: Vec<u8>) -> Result<()> {
259		// The previous group is complete; no more frames will be appended to it.
260		if let Some(mut group) = self.group.take() {
261			group.finish()?;
262		}
263
264		let len = snapshot.len() as u64;
265		let mut group = self.track.append_group()?;
266		group.write_frame(snapshot)?;
267		self.group_bytes = len;
268		self.group_frames = 1;
269
270		if self.config.delta_ratio.is_some() {
271			// Keep the group open so future deltas can be appended.
272			self.group = Some(group);
273		} else {
274			// Deltas disabled: one frame per group, identical to a plain JSON track.
275			group.finish()?;
276		}
277
278		Ok(())
279	}
280
281	fn finish(&mut self) -> Result<()> {
282		if let Some(mut group) = self.group.take() {
283			group.finish()?;
284		}
285		self.track.finish()?;
286		Ok(())
287	}
288}
289
290/// Consumes a JSON value from a track, reconstructing it from snapshots and deltas.
291pub struct Consumer<T> {
292	track: moq_net::TrackConsumer,
293	group: Option<moq_net::GroupConsumer>,
294	current: Option<Value>,
295	frames_read: usize,
296	_marker: PhantomData<fn() -> T>,
297}
298
299// Manual impl so cloning doesn't require `T: Clone`; `T` only lives in PhantomData.
300// Cloned readers inherit the current reconstruction state, then advance in parallel.
301impl<T> Clone for Consumer<T> {
302	fn clone(&self) -> Self {
303		Self {
304			track: self.track.clone(),
305			group: self.group.clone(),
306			current: self.current.clone(),
307			frames_read: self.frames_read,
308			_marker: PhantomData,
309		}
310	}
311}
312
313impl<T: DeserializeOwned> Consumer<T> {
314	/// Create a consumer reading from the given track subscriber.
315	pub fn new(track: moq_net::TrackConsumer) -> Self {
316		Self {
317			track,
318			group: None,
319			current: None,
320			frames_read: 0,
321			_marker: PhantomData,
322		}
323	}
324
325	/// Get the next reconstructed value, or `None` once the track ends.
326	pub async fn next(&mut self) -> Result<Option<T>>
327	where
328		T: Unpin,
329	{
330		kio::wait(|waiter| self.poll_next(waiter)).await
331	}
332
333	/// Poll for the next reconstructed value, without blocking.
334	///
335	/// Jumps to the newest group, reads its snapshot, and applies deltas in order, yielding the
336	/// reconstructed value after each frame. Switching to a newer group discards the older one.
337	pub fn poll_next(&mut self, waiter: &kio::Waiter) -> Poll<Result<Option<T>>> {
338		// Drain to the newest group, resetting reconstruction state whenever we switch.
339		let track_finished = loop {
340			match self.track.poll_next_group(waiter)? {
341				Poll::Ready(Some(group)) => {
342					self.group = Some(group);
343					self.current = None;
344					self.frames_read = 0;
345				}
346				Poll::Ready(None) => break true,
347				Poll::Pending => break false,
348			}
349		};
350
351		if let Some(group) = &mut self.group {
352			match group.poll_read_frame(waiter)? {
353				Poll::Ready(Some(frame)) => return Poll::Ready(Ok(Some(self.apply(frame)?))),
354				// The current group is exhausted; wait for a newer one.
355				Poll::Ready(None) => self.group = None,
356				Poll::Pending => return Poll::Pending,
357			}
358		}
359
360		if track_finished {
361			Poll::Ready(Ok(None))
362		} else {
363			Poll::Pending
364		}
365	}
366
367	/// Apply one frame: frame 0 of a group is a snapshot, the rest are merge patches.
368	fn apply(&mut self, frame: bytes::Bytes) -> Result<T> {
369		if self.frames_read == 0 {
370			self.current = Some(serde_json::from_slice(&frame)?);
371		} else {
372			let patch: Value = serde_json::from_slice(&frame)?;
373			let current = self.current.as_mut().expect("a snapshot precedes any delta");
374			json_patch::merge(current, &patch);
375		}
376		self.frames_read += 1;
377
378		let current = self
379			.current
380			.as_ref()
381			.expect("a value is present after applying a frame");
382		Ok(serde_json::from_value(current.clone())?)
383	}
384}
385
386#[cfg(test)]
387mod test {
388	use super::*;
389	use serde_json::json;
390
391	fn producer(config: Config) -> (Producer<Value>, moq_net::TrackConsumer) {
392		let track = moq_net::Track::new("test").produce();
393		let consumer = track.consume();
394		(Producer::new(track, config), consumer)
395	}
396
397	/// Drain every value currently available from a consumer without blocking.
398	fn drain(track: moq_net::TrackConsumer) -> Vec<Value> {
399		let mut consumer = Consumer::<Value>::new(track);
400		let waiter = kio::Waiter::noop();
401		let mut out = Vec::new();
402		while let Poll::Ready(Ok(Some(value))) = consumer.poll_next(&waiter) {
403			out.push(value);
404		}
405		out
406	}
407
408	#[test]
409	fn deltas_off_snapshot_per_group() {
410		let (mut producer, track) = producer(Config::default());
411		producer.update(&json!({ "a": 1 })).unwrap();
412		producer.update(&json!({ "a": 2 })).unwrap();
413		producer.finish().unwrap();
414
415		// Two updates => two groups, each a full snapshot. A consumer that joins after both
416		// exist only sees the latest, like the existing catalog consumer.
417		assert_eq!(track.latest(), Some(1));
418		assert_eq!(drain(track), vec![json!({ "a": 2 })]);
419	}
420
421	#[test]
422	fn live_consumer_sees_each_update() {
423		let (mut producer, track) = producer(Config::default());
424		let mut consumer = Consumer::<Value>::new(track);
425		let waiter = kio::Waiter::noop();
426
427		for n in 1..=3 {
428			producer.update(&json!({ "a": n })).unwrap();
429			match consumer.poll_next(&waiter) {
430				Poll::Ready(Ok(Some(value))) => assert_eq!(value, json!({ "a": n })),
431				other => panic!("expected value, got {other:?}"),
432			}
433		}
434	}
435
436	#[test]
437	fn unchanged_value_writes_nothing() {
438		let (mut producer, track) = producer(Config::default());
439		producer.update(&json!({ "a": 1 })).unwrap();
440		producer.update(&json!({ "a": 1 })).unwrap();
441		producer.finish().unwrap();
442
443		assert_eq!(track.latest(), Some(0));
444		assert_eq!(drain(track), vec![json!({ "a": 1 })]);
445	}
446
447	#[test]
448	fn deltas_share_one_group() {
449		let config = Config {
450			delta_ratio: Some(100.0),
451		};
452		let (mut producer, track) = producer(config);
453		producer.update(&json!({ "a": 1, "b": 1 })).unwrap();
454		producer.update(&json!({ "a": 1, "b": 2 })).unwrap();
455		producer.update(&json!({ "a": 1, "b": 3 })).unwrap();
456		producer.finish().unwrap();
457
458		// All updates fit in a single group as snapshot + deltas.
459		assert_eq!(track.latest(), Some(0));
460		let values = drain(track);
461		assert_eq!(values.last().unwrap(), &json!({ "a": 1, "b": 3 }));
462	}
463
464	#[test]
465	fn tight_ratio_rolls_snapshots() {
466		// A ratio of 1.0 leaves no room for any delta past the snapshot, so every change rolls.
467		let config = Config { delta_ratio: Some(1.0) };
468		let (mut producer, track) = producer(config);
469		producer.update(&json!({ "a": 1 })).unwrap();
470		producer.update(&json!({ "a": 2 })).unwrap();
471		producer.update(&json!({ "a": 3 })).unwrap();
472		producer.finish().unwrap();
473
474		assert_eq!(track.latest(), Some(2));
475	}
476
477	#[test]
478	fn array_change_is_delta() {
479		let config = Config {
480			delta_ratio: Some(100.0),
481		};
482		let (mut producer, track) = producer(config);
483		producer.update(&json!({ "list": [1, 2] })).unwrap();
484		producer.update(&json!({ "list": [1, 2, 3] })).unwrap();
485		producer.finish().unwrap();
486
487		// The array is replaced wholesale in a delta, so it stays in the same group.
488		assert_eq!(track.latest(), Some(0));
489		assert_eq!(drain(track).last().unwrap(), &json!({ "list": [1, 2, 3] }));
490	}
491
492	#[test]
493	fn frame_cap_rolls_snapshot() {
494		let config = Config {
495			delta_ratio: Some(1_000_000.0),
496		};
497		let (mut producer, track) = producer(config);
498		// First update is the snapshot (frame 0); then MAX_DELTA_FRAMES - 1 deltas fill the group.
499		for i in 0..=MAX_DELTA_FRAMES {
500			producer.update(&json!({ "n": i })).unwrap();
501		}
502		producer.finish().unwrap();
503
504		// The frame cap forced exactly one extra snapshot group despite the huge ratio.
505		assert_eq!(track.latest(), Some(1));
506		assert_eq!(drain(track).last().unwrap(), &json!({ "n": MAX_DELTA_FRAMES }));
507	}
508
509	#[test]
510	fn late_joiner_reconstructs_from_deltas() {
511		let config = Config {
512			delta_ratio: Some(100.0),
513		};
514		let (mut producer, track) = producer(config);
515		producer.update(&json!({ "a": 1, "b": 1 })).unwrap();
516		producer.update(&json!({ "a": 1, "b": 2 })).unwrap();
517		producer.update(&json!({ "a": 5, "b": 2 })).unwrap();
518		producer.finish().unwrap();
519
520		// A consumer created only now still rebuilds the final value from snapshot + deltas.
521		assert_eq!(drain(track).last().unwrap(), &json!({ "a": 5, "b": 2 }));
522	}
523
524	#[test]
525	fn lock_composes_independent_owners() {
526		// Mirrors the catalog use case: separate owners each edit their own field through the guard.
527		#[derive(serde::Serialize, serde::Deserialize, Default, PartialEq, Debug)]
528		struct Doc {
529			#[serde(skip_serializing_if = "Option::is_none")]
530			video: Option<String>,
531			#[serde(skip_serializing_if = "Option::is_none")]
532			scte35: Option<u32>,
533		}
534
535		let track = moq_net::Track::new("test").produce();
536		let consumer = track.consume();
537		let mut producer = Producer::<Doc>::new(track, Config::default());
538
539		// First owner sets its field.
540		producer.lock().video = Some("v1".to_string());
541
542		// Second owner starts from the latest value and adds its own field without clobbering.
543		producer.lock().scte35 = Some(42);
544
545		// Locking without mutating publishes nothing (the guard stays clean).
546		let _ = producer.lock();
547
548		producer.finish().unwrap();
549
550		let mut consumer = Consumer::<Doc>::new(consumer);
551		let waiter = kio::Waiter::noop();
552		let mut last = None;
553		while let Poll::Ready(Ok(Some(value))) = consumer.poll_next(&waiter) {
554			last = Some(value);
555		}
556		assert_eq!(
557			last.unwrap(),
558			Doc {
559				video: Some("v1".to_string()),
560				scte35: Some(42),
561			}
562		);
563	}
564
565	#[test]
566	fn newer_group_supersedes_in_progress_reconstruction() {
567		// A tight ratio lets one delta fit, then forces the next update into a new snapshot group.
568		let config = Config { delta_ratio: Some(2.0) };
569		let (mut producer, track) = producer(config);
570		let observer = producer.consume();
571		let mut consumer = Consumer::<Value>::new(track);
572		let waiter = kio::Waiter::noop();
573
574		producer.update(&json!({ "a": 1 })).unwrap(); // snapshot, group 0
575		match consumer.poll_next(&waiter) {
576			Poll::Ready(Ok(Some(value))) => assert_eq!(value, json!({ "a": 1 })),
577			other => panic!("expected first value, got {other:?}"),
578		}
579
580		producer.update(&json!({ "a": 2 })).unwrap(); // delta in group 0
581		producer.update(&json!({ "a": 3 })).unwrap(); // exceeds budget, rolls group 1
582		producer.finish().unwrap();
583		assert_eq!(observer.latest(), Some(1));
584
585		// The consumer jumps to the newest group and never yields a stale value.
586		let mut last = None;
587		while let Poll::Ready(Ok(Some(value))) = consumer.poll_next(&waiter) {
588			last = Some(value);
589		}
590		assert_eq!(last.unwrap(), json!({ "a": 3 }));
591	}
592
593	#[test]
594	fn cloned_consumer_reconstructs_independently() {
595		// Deltas share one group, so a clone taken mid-group carries in-progress reconstruction state.
596		let config = Config {
597			delta_ratio: Some(100.0),
598		};
599		let (mut producer, track) = producer(config);
600		let mut consumer = Consumer::<Value>::new(track);
601		let waiter = kio::Waiter::noop();
602
603		producer.update(&json!({ "a": 1, "b": 1 })).unwrap(); // snapshot, group 0
604		match consumer.poll_next(&waiter) {
605			Poll::Ready(Ok(Some(value))) => assert_eq!(value, json!({ "a": 1, "b": 1 })),
606			other => panic!("expected snapshot, got {other:?}"),
607		}
608
609		// Clone after the snapshot: the copy inherits `current`/`frames_read` and an independent cursor.
610		let mut clone = consumer.clone();
611
612		producer.update(&json!({ "a": 1, "b": 2 })).unwrap(); // delta in group 0
613		producer.finish().unwrap();
614
615		// Each consumer applies the delta on top of its own reconstruction state.
616		let expected = json!({ "a": 1, "b": 2 });
617		for consumer in [&mut consumer, &mut clone] {
618			match consumer.poll_next(&waiter) {
619				Poll::Ready(Ok(Some(value))) => assert_eq!(value, expected),
620				other => panic!("expected delta, got {other:?}"),
621			}
622		}
623	}
624}