1mod diff;
13
14use std::marker::PhantomData;
15use std::ops::{Deref, DerefMut};
16use std::sync::{Arc, Mutex, MutexGuard};
17use std::task::Poll;
18
19use serde::Serialize;
20use serde::de::DeserializeOwned;
21use serde_json::Value;
22
23use crate::diff::diff;
24
25const MAX_DELTA_FRAMES: usize = 256;
30
31#[derive(thiserror::Error, Debug, Clone)]
33#[non_exhaustive]
34pub enum Error {
35 #[error(transparent)]
37 Net(#[from] moq_net::Error),
38
39 #[error("json: {0}")]
43 Json(String),
44}
45
46impl From<serde_json::Error> for Error {
47 fn from(err: serde_json::Error) -> Self {
48 Error::Json(err.to_string())
49 }
50}
51
52pub type Result<T> = std::result::Result<T, Error>;
54
55#[derive(Debug, Clone, Default)]
57pub struct Config {
58 pub delta_ratio: Option<f64>,
66}
67
68pub struct Producer<T> {
73 inner: Arc<Mutex<Inner>>,
74 _marker: PhantomData<fn(T)>,
75}
76
77impl<T> Clone for Producer<T> {
78 fn clone(&self) -> Self {
79 Self {
80 inner: self.inner.clone(),
81 _marker: PhantomData,
82 }
83 }
84}
85
86impl<T> Producer<T> {
87 pub fn consume(&self) -> moq_net::TrackConsumer {
89 self.inner.lock().unwrap().track.consume()
90 }
91}
92
93impl<T: Serialize> Producer<T> {
94 pub fn new(track: moq_net::TrackProducer, config: Config) -> Self {
96 Self {
97 inner: Arc::new(Mutex::new(Inner {
98 track,
99 group: None,
100 last: None,
101 group_bytes: 0,
102 group_frames: 0,
103 config,
104 })),
105 _marker: PhantomData,
106 }
107 }
108
109 pub fn update(&mut self, value: &T) -> Result<()> {
113 let json = serde_json::to_value(value)?;
114 let snapshot = serde_json::to_vec(value)?;
117 self.inner.lock().unwrap().update(json, snapshot)
118 }
119
120 pub fn lock(&mut self) -> Guard<'_, T>
131 where
132 T: Default + DeserializeOwned,
133 {
134 let inner = self.inner.lock().unwrap();
135 let value = inner
136 .last
137 .as_ref()
138 .and_then(|last| serde_json::from_value(last.clone()).ok())
139 .unwrap_or_default();
140
141 Guard {
142 inner,
143 value,
144 dirty: false,
145 }
146 }
147
148 pub fn finish(&mut self) -> Result<()> {
150 self.inner.lock().unwrap().finish()
151 }
152}
153
154pub struct Guard<'a, T: Serialize> {
159 inner: MutexGuard<'a, Inner>,
160 value: T,
161 dirty: bool,
162}
163
164impl<T: Serialize> Deref for Guard<'_, T> {
165 type Target = T;
166
167 fn deref(&self) -> &T {
168 &self.value
169 }
170}
171
172impl<T: Serialize> DerefMut for Guard<'_, T> {
173 fn deref_mut(&mut self) -> &mut T {
174 self.dirty = true;
175 &mut self.value
176 }
177}
178
179impl<T: Serialize> Drop for Guard<'_, T> {
180 fn drop(&mut self) {
181 if !self.dirty {
182 return;
183 }
184
185 let Ok(json) = serde_json::to_value(&self.value) else {
186 return;
187 };
188 let Ok(snapshot) = serde_json::to_vec(&self.value) else {
189 return;
190 };
191
192 let _ = self.inner.update(json, snapshot);
194 }
195}
196
197struct Inner {
199 track: moq_net::TrackProducer,
200 group: Option<moq_net::GroupProducer>,
201 last: Option<Value>,
202 group_bytes: u64,
203 group_frames: usize,
204 config: Config,
205}
206
207impl Inner {
208 fn update(&mut self, json: Value, snapshot: Vec<u8>) -> Result<()> {
209 if self.last.as_ref() == Some(&json) {
210 return Ok(());
211 }
212
213 match self.delta(&json, snapshot.len())? {
214 Some(delta) => {
215 let group = self.group.as_mut().expect("delta requires an open group");
216 let len = delta.len() as u64;
217 group.write_frame(delta)?;
218 self.group_bytes += len;
219 self.group_frames += 1;
220 }
221 None => self.snapshot(snapshot)?,
222 }
223
224 self.last = Some(json);
225 Ok(())
226 }
227
228 fn delta(&self, value: &Value, snapshot_len: usize) -> Result<Option<Vec<u8>>> {
231 let Some(ratio) = self.config.delta_ratio else {
232 return Ok(None);
233 };
234 let Some(last) = &self.last else {
235 return Ok(None);
236 };
237 if self.group.is_none() || self.group_frames >= MAX_DELTA_FRAMES {
238 return Ok(None);
239 }
240
241 let diff = diff(last, value);
242 if diff.forced_snapshot {
243 return Ok(None);
244 }
245
246 let delta = serde_json::to_vec(&diff.patch)?;
247
248 let projected = (self.group_bytes + delta.len() as u64) as f64;
250 if projected > ratio * snapshot_len as f64 {
251 return Ok(None);
252 }
253
254 Ok(Some(delta))
255 }
256
257 fn snapshot(&mut self, snapshot: Vec<u8>) -> Result<()> {
259 if let Some(mut group) = self.group.take() {
261 group.finish()?;
262 }
263
264 let len = snapshot.len() as u64;
265 let mut group = self.track.append_group()?;
266 group.write_frame(snapshot)?;
267 self.group_bytes = len;
268 self.group_frames = 1;
269
270 if self.config.delta_ratio.is_some() {
271 self.group = Some(group);
273 } else {
274 group.finish()?;
276 }
277
278 Ok(())
279 }
280
281 fn finish(&mut self) -> Result<()> {
282 if let Some(mut group) = self.group.take() {
283 group.finish()?;
284 }
285 self.track.finish()?;
286 Ok(())
287 }
288}
289
290pub struct Consumer<T> {
292 track: moq_net::TrackConsumer,
293 group: Option<moq_net::GroupConsumer>,
294 current: Option<Value>,
295 frames_read: usize,
296 _marker: PhantomData<fn() -> T>,
297}
298
299impl<T> Clone for Consumer<T> {
302 fn clone(&self) -> Self {
303 Self {
304 track: self.track.clone(),
305 group: self.group.clone(),
306 current: self.current.clone(),
307 frames_read: self.frames_read,
308 _marker: PhantomData,
309 }
310 }
311}
312
313impl<T: DeserializeOwned> Consumer<T> {
314 pub fn new(track: moq_net::TrackConsumer) -> Self {
316 Self {
317 track,
318 group: None,
319 current: None,
320 frames_read: 0,
321 _marker: PhantomData,
322 }
323 }
324
325 pub async fn next(&mut self) -> Result<Option<T>>
327 where
328 T: Unpin,
329 {
330 kio::wait(|waiter| self.poll_next(waiter)).await
331 }
332
333 pub fn poll_next(&mut self, waiter: &kio::Waiter) -> Poll<Result<Option<T>>> {
338 let track_finished = loop {
340 match self.track.poll_next_group(waiter)? {
341 Poll::Ready(Some(group)) => {
342 self.group = Some(group);
343 self.current = None;
344 self.frames_read = 0;
345 }
346 Poll::Ready(None) => break true,
347 Poll::Pending => break false,
348 }
349 };
350
351 if let Some(group) = &mut self.group {
352 match group.poll_read_frame(waiter)? {
353 Poll::Ready(Some(frame)) => return Poll::Ready(Ok(Some(self.apply(frame)?))),
354 Poll::Ready(None) => self.group = None,
356 Poll::Pending => return Poll::Pending,
357 }
358 }
359
360 if track_finished {
361 Poll::Ready(Ok(None))
362 } else {
363 Poll::Pending
364 }
365 }
366
367 fn apply(&mut self, frame: bytes::Bytes) -> Result<T> {
369 if self.frames_read == 0 {
370 self.current = Some(serde_json::from_slice(&frame)?);
371 } else {
372 let patch: Value = serde_json::from_slice(&frame)?;
373 let current = self.current.as_mut().expect("a snapshot precedes any delta");
374 json_patch::merge(current, &patch);
375 }
376 self.frames_read += 1;
377
378 let current = self
379 .current
380 .as_ref()
381 .expect("a value is present after applying a frame");
382 Ok(serde_json::from_value(current.clone())?)
383 }
384}
385
386#[cfg(test)]
387mod test {
388 use super::*;
389 use serde_json::json;
390
391 fn producer(config: Config) -> (Producer<Value>, moq_net::TrackConsumer) {
392 let track = moq_net::Track::new("test").produce();
393 let consumer = track.consume();
394 (Producer::new(track, config), consumer)
395 }
396
397 fn drain(track: moq_net::TrackConsumer) -> Vec<Value> {
399 let mut consumer = Consumer::<Value>::new(track);
400 let waiter = kio::Waiter::noop();
401 let mut out = Vec::new();
402 while let Poll::Ready(Ok(Some(value))) = consumer.poll_next(&waiter) {
403 out.push(value);
404 }
405 out
406 }
407
408 #[test]
409 fn deltas_off_snapshot_per_group() {
410 let (mut producer, track) = producer(Config::default());
411 producer.update(&json!({ "a": 1 })).unwrap();
412 producer.update(&json!({ "a": 2 })).unwrap();
413 producer.finish().unwrap();
414
415 assert_eq!(track.latest(), Some(1));
418 assert_eq!(drain(track), vec![json!({ "a": 2 })]);
419 }
420
421 #[test]
422 fn live_consumer_sees_each_update() {
423 let (mut producer, track) = producer(Config::default());
424 let mut consumer = Consumer::<Value>::new(track);
425 let waiter = kio::Waiter::noop();
426
427 for n in 1..=3 {
428 producer.update(&json!({ "a": n })).unwrap();
429 match consumer.poll_next(&waiter) {
430 Poll::Ready(Ok(Some(value))) => assert_eq!(value, json!({ "a": n })),
431 other => panic!("expected value, got {other:?}"),
432 }
433 }
434 }
435
436 #[test]
437 fn unchanged_value_writes_nothing() {
438 let (mut producer, track) = producer(Config::default());
439 producer.update(&json!({ "a": 1 })).unwrap();
440 producer.update(&json!({ "a": 1 })).unwrap();
441 producer.finish().unwrap();
442
443 assert_eq!(track.latest(), Some(0));
444 assert_eq!(drain(track), vec![json!({ "a": 1 })]);
445 }
446
447 #[test]
448 fn deltas_share_one_group() {
449 let config = Config {
450 delta_ratio: Some(100.0),
451 };
452 let (mut producer, track) = producer(config);
453 producer.update(&json!({ "a": 1, "b": 1 })).unwrap();
454 producer.update(&json!({ "a": 1, "b": 2 })).unwrap();
455 producer.update(&json!({ "a": 1, "b": 3 })).unwrap();
456 producer.finish().unwrap();
457
458 assert_eq!(track.latest(), Some(0));
460 let values = drain(track);
461 assert_eq!(values.last().unwrap(), &json!({ "a": 1, "b": 3 }));
462 }
463
464 #[test]
465 fn tight_ratio_rolls_snapshots() {
466 let config = Config { delta_ratio: Some(1.0) };
468 let (mut producer, track) = producer(config);
469 producer.update(&json!({ "a": 1 })).unwrap();
470 producer.update(&json!({ "a": 2 })).unwrap();
471 producer.update(&json!({ "a": 3 })).unwrap();
472 producer.finish().unwrap();
473
474 assert_eq!(track.latest(), Some(2));
475 }
476
477 #[test]
478 fn array_change_is_delta() {
479 let config = Config {
480 delta_ratio: Some(100.0),
481 };
482 let (mut producer, track) = producer(config);
483 producer.update(&json!({ "list": [1, 2] })).unwrap();
484 producer.update(&json!({ "list": [1, 2, 3] })).unwrap();
485 producer.finish().unwrap();
486
487 assert_eq!(track.latest(), Some(0));
489 assert_eq!(drain(track).last().unwrap(), &json!({ "list": [1, 2, 3] }));
490 }
491
492 #[test]
493 fn frame_cap_rolls_snapshot() {
494 let config = Config {
495 delta_ratio: Some(1_000_000.0),
496 };
497 let (mut producer, track) = producer(config);
498 for i in 0..=MAX_DELTA_FRAMES {
500 producer.update(&json!({ "n": i })).unwrap();
501 }
502 producer.finish().unwrap();
503
504 assert_eq!(track.latest(), Some(1));
506 assert_eq!(drain(track).last().unwrap(), &json!({ "n": MAX_DELTA_FRAMES }));
507 }
508
509 #[test]
510 fn late_joiner_reconstructs_from_deltas() {
511 let config = Config {
512 delta_ratio: Some(100.0),
513 };
514 let (mut producer, track) = producer(config);
515 producer.update(&json!({ "a": 1, "b": 1 })).unwrap();
516 producer.update(&json!({ "a": 1, "b": 2 })).unwrap();
517 producer.update(&json!({ "a": 5, "b": 2 })).unwrap();
518 producer.finish().unwrap();
519
520 assert_eq!(drain(track).last().unwrap(), &json!({ "a": 5, "b": 2 }));
522 }
523
524 #[test]
525 fn lock_composes_independent_owners() {
526 #[derive(serde::Serialize, serde::Deserialize, Default, PartialEq, Debug)]
528 struct Doc {
529 #[serde(skip_serializing_if = "Option::is_none")]
530 video: Option<String>,
531 #[serde(skip_serializing_if = "Option::is_none")]
532 scte35: Option<u32>,
533 }
534
535 let track = moq_net::Track::new("test").produce();
536 let consumer = track.consume();
537 let mut producer = Producer::<Doc>::new(track, Config::default());
538
539 producer.lock().video = Some("v1".to_string());
541
542 producer.lock().scte35 = Some(42);
544
545 let _ = producer.lock();
547
548 producer.finish().unwrap();
549
550 let mut consumer = Consumer::<Doc>::new(consumer);
551 let waiter = kio::Waiter::noop();
552 let mut last = None;
553 while let Poll::Ready(Ok(Some(value))) = consumer.poll_next(&waiter) {
554 last = Some(value);
555 }
556 assert_eq!(
557 last.unwrap(),
558 Doc {
559 video: Some("v1".to_string()),
560 scte35: Some(42),
561 }
562 );
563 }
564
565 #[test]
566 fn newer_group_supersedes_in_progress_reconstruction() {
567 let config = Config { delta_ratio: Some(2.0) };
569 let (mut producer, track) = producer(config);
570 let observer = producer.consume();
571 let mut consumer = Consumer::<Value>::new(track);
572 let waiter = kio::Waiter::noop();
573
574 producer.update(&json!({ "a": 1 })).unwrap(); match consumer.poll_next(&waiter) {
576 Poll::Ready(Ok(Some(value))) => assert_eq!(value, json!({ "a": 1 })),
577 other => panic!("expected first value, got {other:?}"),
578 }
579
580 producer.update(&json!({ "a": 2 })).unwrap(); producer.update(&json!({ "a": 3 })).unwrap(); producer.finish().unwrap();
583 assert_eq!(observer.latest(), Some(1));
584
585 let mut last = None;
587 while let Poll::Ready(Ok(Some(value))) = consumer.poll_next(&waiter) {
588 last = Some(value);
589 }
590 assert_eq!(last.unwrap(), json!({ "a": 3 }));
591 }
592
593 #[test]
594 fn cloned_consumer_reconstructs_independently() {
595 let config = Config {
597 delta_ratio: Some(100.0),
598 };
599 let (mut producer, track) = producer(config);
600 let mut consumer = Consumer::<Value>::new(track);
601 let waiter = kio::Waiter::noop();
602
603 producer.update(&json!({ "a": 1, "b": 1 })).unwrap(); match consumer.poll_next(&waiter) {
605 Poll::Ready(Ok(Some(value))) => assert_eq!(value, json!({ "a": 1, "b": 1 })),
606 other => panic!("expected snapshot, got {other:?}"),
607 }
608
609 let mut clone = consumer.clone();
611
612 producer.update(&json!({ "a": 1, "b": 2 })).unwrap(); producer.finish().unwrap();
614
615 let expected = json!({ "a": 1, "b": 2 });
617 for consumer in [&mut consumer, &mut clone] {
618 match consumer.poll_next(&waiter) {
619 Poll::Ready(Ok(Some(value))) => assert_eq!(value, expected),
620 other => panic!("expected delta, got {other:?}"),
621 }
622 }
623 }
624}