1mod diff;
13
14use std::marker::PhantomData;
15use std::sync::{Arc, Mutex};
16use std::task::Poll;
17
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use serde_json::Value;
21
22use crate::diff::diff;
23
24const MAX_DELTA_FRAMES: usize = 256;
29
30#[derive(thiserror::Error, Debug, Clone)]
32#[non_exhaustive]
33pub enum Error {
34 #[error(transparent)]
36 Net(#[from] moq_net::Error),
37
38 #[error("json: {0}")]
42 Json(String),
43}
44
45impl From<serde_json::Error> for Error {
46 fn from(err: serde_json::Error) -> Self {
47 Error::Json(err.to_string())
48 }
49}
50
51pub type Result<T> = std::result::Result<T, Error>;
53
54#[derive(Debug, Clone, Default)]
56pub struct Config {
57 pub delta_ratio: Option<f64>,
65}
66
67pub struct Producer<T> {
72 inner: Arc<Mutex<Inner>>,
73 _marker: PhantomData<fn(T)>,
74}
75
76impl<T> Clone for Producer<T> {
77 fn clone(&self) -> Self {
78 Self {
79 inner: self.inner.clone(),
80 _marker: PhantomData,
81 }
82 }
83}
84
85impl<T> Producer<T> {
86 pub fn consume(&self) -> moq_net::TrackConsumer {
88 self.inner.lock().unwrap().track.consume()
89 }
90}
91
92impl<T: Serialize> Producer<T> {
93 pub fn new(track: moq_net::TrackProducer, config: Config) -> Self {
95 Self {
96 inner: Arc::new(Mutex::new(Inner {
97 track,
98 group: None,
99 last: None,
100 group_bytes: 0,
101 group_frames: 0,
102 config,
103 })),
104 _marker: PhantomData,
105 }
106 }
107
108 pub fn update(&mut self, value: &T) -> Result<()> {
112 let json = serde_json::to_value(value)?;
113 let snapshot = serde_json::to_vec(value)?;
116 self.inner.lock().unwrap().update(json, snapshot)
117 }
118
119 pub fn finish(&mut self) -> Result<()> {
121 self.inner.lock().unwrap().finish()
122 }
123}
124
125struct Inner {
127 track: moq_net::TrackProducer,
128 group: Option<moq_net::GroupProducer>,
129 last: Option<Value>,
130 group_bytes: u64,
131 group_frames: usize,
132 config: Config,
133}
134
135impl Inner {
136 fn update(&mut self, json: Value, snapshot: Vec<u8>) -> Result<()> {
137 if self.last.as_ref() == Some(&json) {
138 return Ok(());
139 }
140
141 match self.delta(&json, snapshot.len())? {
142 Some(delta) => {
143 let group = self.group.as_mut().expect("delta requires an open group");
144 let len = delta.len() as u64;
145 group.write_frame(delta)?;
146 self.group_bytes += len;
147 self.group_frames += 1;
148 }
149 None => self.snapshot(snapshot)?,
150 }
151
152 self.last = Some(json);
153 Ok(())
154 }
155
156 fn delta(&self, value: &Value, snapshot_len: usize) -> Result<Option<Vec<u8>>> {
159 let Some(ratio) = self.config.delta_ratio else {
160 return Ok(None);
161 };
162 let Some(last) = &self.last else {
163 return Ok(None);
164 };
165 if self.group.is_none() || self.group_frames >= MAX_DELTA_FRAMES {
166 return Ok(None);
167 }
168
169 let diff = diff(last, value);
170 if diff.forced_snapshot {
171 return Ok(None);
172 }
173
174 let delta = serde_json::to_vec(&diff.patch)?;
175
176 let projected = (self.group_bytes + delta.len() as u64) as f64;
178 if projected > ratio * snapshot_len as f64 {
179 return Ok(None);
180 }
181
182 Ok(Some(delta))
183 }
184
185 fn snapshot(&mut self, snapshot: Vec<u8>) -> Result<()> {
187 if let Some(mut group) = self.group.take() {
189 group.finish()?;
190 }
191
192 let len = snapshot.len() as u64;
193 let mut group = self.track.append_group()?;
194 group.write_frame(snapshot)?;
195 self.group_bytes = len;
196 self.group_frames = 1;
197
198 if self.config.delta_ratio.is_some() {
199 self.group = Some(group);
201 } else {
202 group.finish()?;
204 }
205
206 Ok(())
207 }
208
209 fn finish(&mut self) -> Result<()> {
210 if let Some(mut group) = self.group.take() {
211 group.finish()?;
212 }
213 self.track.finish()?;
214 Ok(())
215 }
216}
217
218pub struct Consumer<T> {
220 track: moq_net::TrackConsumer,
221 group: Option<moq_net::GroupConsumer>,
222 current: Option<Value>,
223 frames_read: usize,
224 _marker: PhantomData<fn() -> T>,
225}
226
227impl<T: DeserializeOwned> Consumer<T> {
228 pub fn new(track: moq_net::TrackConsumer) -> Self {
230 Self {
231 track,
232 group: None,
233 current: None,
234 frames_read: 0,
235 _marker: PhantomData,
236 }
237 }
238
239 pub async fn next(&mut self) -> Result<Option<T>>
241 where
242 T: Unpin,
243 {
244 kio::wait(|waiter| self.poll_next(waiter)).await
245 }
246
247 pub fn poll_next(&mut self, waiter: &kio::Waiter) -> Poll<Result<Option<T>>> {
252 let track_finished = loop {
254 match self.track.poll_next_group(waiter)? {
255 Poll::Ready(Some(group)) => {
256 self.group = Some(group);
257 self.current = None;
258 self.frames_read = 0;
259 }
260 Poll::Ready(None) => break true,
261 Poll::Pending => break false,
262 }
263 };
264
265 if let Some(group) = &mut self.group {
266 match group.poll_read_frame(waiter)? {
267 Poll::Ready(Some(frame)) => return Poll::Ready(Ok(Some(self.apply(frame)?))),
268 Poll::Ready(None) => self.group = None,
270 Poll::Pending => return Poll::Pending,
271 }
272 }
273
274 if track_finished {
275 Poll::Ready(Ok(None))
276 } else {
277 Poll::Pending
278 }
279 }
280
281 fn apply(&mut self, frame: bytes::Bytes) -> Result<T> {
283 if self.frames_read == 0 {
284 self.current = Some(serde_json::from_slice(&frame)?);
285 } else {
286 let patch: Value = serde_json::from_slice(&frame)?;
287 let current = self.current.as_mut().expect("a snapshot precedes any delta");
288 json_patch::merge(current, &patch);
289 }
290 self.frames_read += 1;
291
292 let current = self
293 .current
294 .as_ref()
295 .expect("a value is present after applying a frame");
296 Ok(serde_json::from_value(current.clone())?)
297 }
298}
299
300#[cfg(test)]
301mod test {
302 use super::*;
303 use serde_json::json;
304
305 fn producer(config: Config) -> (Producer<Value>, moq_net::TrackConsumer) {
306 let track = moq_net::Track::new("test").produce();
307 let consumer = track.consume();
308 (Producer::new(track, config), consumer)
309 }
310
311 fn drain(track: moq_net::TrackConsumer) -> Vec<Value> {
313 let mut consumer = Consumer::<Value>::new(track);
314 let waiter = kio::Waiter::noop();
315 let mut out = Vec::new();
316 while let Poll::Ready(Ok(Some(value))) = consumer.poll_next(&waiter) {
317 out.push(value);
318 }
319 out
320 }
321
322 #[test]
323 fn deltas_off_snapshot_per_group() {
324 let (mut producer, track) = producer(Config::default());
325 producer.update(&json!({ "a": 1 })).unwrap();
326 producer.update(&json!({ "a": 2 })).unwrap();
327 producer.finish().unwrap();
328
329 assert_eq!(track.latest(), Some(1));
332 assert_eq!(drain(track), vec![json!({ "a": 2 })]);
333 }
334
335 #[test]
336 fn live_consumer_sees_each_update() {
337 let (mut producer, track) = producer(Config::default());
338 let mut consumer = Consumer::<Value>::new(track);
339 let waiter = kio::Waiter::noop();
340
341 for n in 1..=3 {
342 producer.update(&json!({ "a": n })).unwrap();
343 match consumer.poll_next(&waiter) {
344 Poll::Ready(Ok(Some(value))) => assert_eq!(value, json!({ "a": n })),
345 other => panic!("expected value, got {other:?}"),
346 }
347 }
348 }
349
350 #[test]
351 fn unchanged_value_writes_nothing() {
352 let (mut producer, track) = producer(Config::default());
353 producer.update(&json!({ "a": 1 })).unwrap();
354 producer.update(&json!({ "a": 1 })).unwrap();
355 producer.finish().unwrap();
356
357 assert_eq!(track.latest(), Some(0));
358 assert_eq!(drain(track), vec![json!({ "a": 1 })]);
359 }
360
361 #[test]
362 fn deltas_share_one_group() {
363 let config = Config {
364 delta_ratio: Some(100.0),
365 };
366 let (mut producer, track) = producer(config);
367 producer.update(&json!({ "a": 1, "b": 1 })).unwrap();
368 producer.update(&json!({ "a": 1, "b": 2 })).unwrap();
369 producer.update(&json!({ "a": 1, "b": 3 })).unwrap();
370 producer.finish().unwrap();
371
372 assert_eq!(track.latest(), Some(0));
374 let values = drain(track);
375 assert_eq!(values.last().unwrap(), &json!({ "a": 1, "b": 3 }));
376 }
377
378 #[test]
379 fn tight_ratio_rolls_snapshots() {
380 let config = Config { delta_ratio: Some(1.0) };
382 let (mut producer, track) = producer(config);
383 producer.update(&json!({ "a": 1 })).unwrap();
384 producer.update(&json!({ "a": 2 })).unwrap();
385 producer.update(&json!({ "a": 3 })).unwrap();
386 producer.finish().unwrap();
387
388 assert_eq!(track.latest(), Some(2));
389 }
390
391 #[test]
392 fn array_change_is_delta() {
393 let config = Config {
394 delta_ratio: Some(100.0),
395 };
396 let (mut producer, track) = producer(config);
397 producer.update(&json!({ "list": [1, 2] })).unwrap();
398 producer.update(&json!({ "list": [1, 2, 3] })).unwrap();
399 producer.finish().unwrap();
400
401 assert_eq!(track.latest(), Some(0));
403 assert_eq!(drain(track).last().unwrap(), &json!({ "list": [1, 2, 3] }));
404 }
405
406 #[test]
407 fn frame_cap_rolls_snapshot() {
408 let config = Config {
409 delta_ratio: Some(1_000_000.0),
410 };
411 let (mut producer, track) = producer(config);
412 for i in 0..=MAX_DELTA_FRAMES {
414 producer.update(&json!({ "n": i })).unwrap();
415 }
416 producer.finish().unwrap();
417
418 assert_eq!(track.latest(), Some(1));
420 assert_eq!(drain(track).last().unwrap(), &json!({ "n": MAX_DELTA_FRAMES }));
421 }
422
423 #[test]
424 fn late_joiner_reconstructs_from_deltas() {
425 let config = Config {
426 delta_ratio: Some(100.0),
427 };
428 let (mut producer, track) = producer(config);
429 producer.update(&json!({ "a": 1, "b": 1 })).unwrap();
430 producer.update(&json!({ "a": 1, "b": 2 })).unwrap();
431 producer.update(&json!({ "a": 5, "b": 2 })).unwrap();
432 producer.finish().unwrap();
433
434 assert_eq!(drain(track).last().unwrap(), &json!({ "a": 5, "b": 2 }));
436 }
437
438 #[test]
439 fn newer_group_supersedes_in_progress_reconstruction() {
440 let config = Config { delta_ratio: Some(2.0) };
442 let (mut producer, track) = producer(config);
443 let observer = producer.consume();
444 let mut consumer = Consumer::<Value>::new(track);
445 let waiter = kio::Waiter::noop();
446
447 producer.update(&json!({ "a": 1 })).unwrap(); match consumer.poll_next(&waiter) {
449 Poll::Ready(Ok(Some(value))) => assert_eq!(value, json!({ "a": 1 })),
450 other => panic!("expected first value, got {other:?}"),
451 }
452
453 producer.update(&json!({ "a": 2 })).unwrap(); producer.update(&json!({ "a": 3 })).unwrap(); producer.finish().unwrap();
456 assert_eq!(observer.latest(), Some(1));
457
458 let mut last = None;
460 while let Poll::Ready(Ok(Some(value))) = consumer.poll_next(&waiter) {
461 last = Some(value);
462 }
463 assert_eq!(last.unwrap(), json!({ "a": 3 }));
464 }
465}