use std::task::{Poll, ready};
use super::{Catalog, CatalogExt};
use crate::Result;
pub struct Consumer<E: CatalogExt = ()> {
inner: moq_json::Consumer<Catalog<E>>,
}
impl<E: CatalogExt> Clone for Consumer<E> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<E: CatalogExt> Consumer<E> {
pub fn new(track: moq_net::TrackConsumer) -> Self {
Self {
inner: moq_json::Consumer::new(track),
}
}
pub fn poll_next(&mut self, waiter: &kio::Waiter) -> Poll<Result<Option<Catalog<E>>>> {
let result = ready!(self.inner.poll_next(waiter));
Poll::Ready(result.map_err(Into::into))
}
pub async fn next(&mut self) -> Result<Option<Catalog<E>>>
where
Catalog<E>: Unpin,
{
Ok(self.inner.next().await?)
}
}
impl<E: CatalogExt> From<moq_net::TrackConsumer> for Consumer<E> {
fn from(inner: moq_net::TrackConsumer) -> Self {
Self::new(inner)
}
}
#[cfg(test)]
mod test {
use std::task::Poll;
use super::*;
fn catalog_payload(name: &str) -> (Catalog, String) {
let mut catalog = Catalog::default();
catalog.audio.renditions.insert(
name.to_string(),
hang::catalog::AudioConfig::new(hang::catalog::AudioCodec::Opus, 48_000, 2),
);
let payload = serde_json::to_string(&catalog).expect("catalog should serialize");
(catalog, payload)
}
fn expect_catalog(result: Poll<Result<Option<Catalog>>>) -> Catalog {
match result {
Poll::Ready(Ok(Some(decoded))) => decoded,
other => panic!("expected catalog payload, got {other:?}"),
}
}
#[test]
fn waits_for_pending_catalog_group_payload() {
let mut track = hang::Catalog::default_track().produce();
let mut consumer = Consumer::new(track.consume());
let mut group = track.append_group().expect("catalog group should append");
let waiter = kio::Waiter::noop();
assert!(matches!(consumer.poll_next(&waiter), Poll::Pending));
let (catalog, payload) = catalog_payload("pending");
group.write_frame(payload).expect("catalog frame should write");
group.finish().expect("catalog group should finish");
assert_eq!(expect_catalog(consumer.poll_next(&waiter)), catalog);
}
#[test]
fn waits_for_pending_catalog_group_payload_after_track_finish() {
let mut track = hang::Catalog::default_track().produce();
let mut consumer = Consumer::new(track.consume());
let mut group = track.append_group().expect("catalog group should append");
track.finish().expect("catalog track should finish");
let waiter = kio::Waiter::noop();
assert!(matches!(consumer.poll_next(&waiter), Poll::Pending));
let (catalog, payload) = catalog_payload("finished");
group.write_frame(payload).expect("catalog frame should write");
group.finish().expect("catalog group should finish");
assert_eq!(expect_catalog(consumer.poll_next(&waiter)), catalog);
}
#[test]
fn returns_latest_complete_catalog_group() {
let mut track = hang::Catalog::default_track().produce();
let mut consumer = Consumer::new(track.consume());
let waiter = kio::Waiter::noop();
let (_old, old_payload) = catalog_payload("old");
let (latest, latest_payload) = catalog_payload("latest");
let mut old_group = track.append_group().expect("old catalog group should append");
old_group
.write_frame(old_payload)
.expect("old catalog frame should write");
old_group.finish().expect("old catalog group should finish");
let mut latest_group = track.append_group().expect("latest catalog group should append");
latest_group
.write_frame(latest_payload)
.expect("latest catalog frame should write");
latest_group.finish().expect("latest catalog group should finish");
track.finish().expect("catalog track should finish");
assert_eq!(expect_catalog(consumer.poll_next(&waiter)), latest);
assert!(matches!(consumer.poll_next(&waiter), Poll::Ready(Ok(None))));
}
#[test]
fn waits_for_newer_pending_group_instead_of_returning_older_ready_group() {
let mut track = hang::Catalog::default_track().produce();
let mut consumer = Consumer::new(track.consume());
let waiter = kio::Waiter::noop();
let (_old, old_payload) = catalog_payload("old");
let (latest, latest_payload) = catalog_payload("latest");
let mut old_group = track.append_group().expect("old catalog group should append");
old_group
.write_frame(old_payload)
.expect("old catalog frame should write");
old_group.finish().expect("old catalog group should finish");
let mut latest_group = track.append_group().expect("latest catalog group should append");
assert!(matches!(consumer.poll_next(&waiter), Poll::Pending));
latest_group
.write_frame(latest_payload)
.expect("latest catalog frame should write");
latest_group.finish().expect("latest catalog group should finish");
assert_eq!(expect_catalog(consumer.poll_next(&waiter)), latest);
}
#[test]
fn retained_pending_group_is_superseded_by_newer_group() {
let mut track = hang::Catalog::default_track().produce();
let mut consumer = Consumer::new(track.consume());
let waiter = kio::Waiter::noop();
let (_old, old_payload) = catalog_payload("old");
let (latest, latest_payload) = catalog_payload("latest");
let mut old_group = track.append_group().expect("old catalog group should append");
assert!(matches!(consumer.poll_next(&waiter), Poll::Pending));
let mut latest_group = track.append_group().expect("latest catalog group should append");
latest_group
.write_frame(latest_payload)
.expect("latest catalog frame should write");
latest_group.finish().expect("latest catalog group should finish");
track.finish().expect("catalog track should finish");
assert_eq!(expect_catalog(consumer.poll_next(&waiter)), latest);
old_group
.write_frame(old_payload)
.expect("old catalog frame should write");
old_group.finish().expect("old catalog group should finish");
assert!(matches!(consumer.poll_next(&waiter), Poll::Ready(Ok(None))));
}
#[test]
fn returns_none_when_empty_track_finishes() {
let mut track = hang::Catalog::default_track().produce();
let mut consumer: Consumer = Consumer::new(track.consume());
let waiter = kio::Waiter::noop();
track.finish().expect("catalog track should finish");
assert!(matches!(consumer.poll_next(&waiter), Poll::Ready(Ok(None))));
}
}