use bytes::Bytes;
use moq_native::moq_lite::{Broadcast, Origin, Track};
use std::time::Duration;
use tokio::time::timeout;
#[tokio::test]
async fn test_local_pubsub() {
let origin = Origin::produce();
let mut broadcast = Broadcast::produce();
let mut track = broadcast.producer.create_track(Track {
name: "serial-data".to_string(),
priority: 0,
});
origin
.producer
.publish_broadcast("test", broadcast.consumer.clone());
let mut consumer = origin.producer.consume();
let (path, broadcast_consumer) = timeout(Duration::from_secs(1), consumer.announced())
.await
.expect("timeout waiting for announcement")
.expect("origin closed");
assert_eq!(path.as_str(), "test");
let broadcast_consumer = broadcast_consumer.expect("broadcast should be announced");
let track_info = Track {
name: "serial-data".to_string(),
priority: 0,
};
let mut track_consumer = broadcast_consumer.subscribe_track(&track_info);
let test_data = "Hello from serial port!";
track.write_frame(Bytes::from(test_data));
let group = timeout(Duration::from_secs(1), track_consumer.next_group())
.await
.expect("timeout waiting for group")
.expect("error reading group")
.expect("track closed");
assert_eq!(group.info.sequence, 0);
let mut group = group;
let frame = timeout(Duration::from_secs(1), group.read_frame())
.await
.expect("timeout waiting for frame")
.expect("error reading frame")
.expect("group closed");
assert_eq!(frame.as_ref(), test_data.as_bytes());
println!("Successfully received: {}", String::from_utf8_lossy(&frame));
}
#[tokio::test]
async fn test_multiple_groups() {
let origin = Origin::produce();
let mut broadcast = Broadcast::produce();
let mut track = broadcast.producer.create_track(Track {
name: "multi-group".to_string(),
priority: 0,
});
origin
.producer
.publish_broadcast("", broadcast.consumer.clone());
let mut consumer = origin.producer.consume();
let (_, bc) = consumer.announced().await.unwrap();
let bc = bc.unwrap();
let mut track_consumer = bc.subscribe_track(&Track {
name: "multi-group".to_string(),
priority: 0,
});
for i in 0..5 {
track.write_frame(Bytes::from(format!("Frame {}", i)));
let group = track_consumer.next_group().await.unwrap().unwrap();
assert_eq!(group.info.sequence, i as u64);
let mut group = group;
let frame = group.read_frame().await.unwrap().unwrap();
let expected = format!("Frame {}", i);
assert_eq!(frame.as_ref(), expected.as_bytes());
println!("Received group {}: {}", i, String::from_utf8_lossy(&frame));
}
}
#[tokio::test]
async fn test_group_with_multiple_frames() {
let origin = Origin::produce();
let mut broadcast = Broadcast::produce();
let mut track = broadcast.producer.create_track(Track {
name: "group-frames".to_string(),
priority: 0,
});
origin
.producer
.publish_broadcast("", broadcast.consumer.clone());
let mut consumer = origin.producer.consume();
let (_, bc) = consumer.announced().await.unwrap();
let bc = bc.unwrap();
let mut track_consumer = bc.subscribe_track(&Track {
name: "group-frames".to_string(),
priority: 0,
});
let mut group = track.append_group();
group.write_frame(Bytes::from("First"));
group.write_frame(Bytes::from("Second"));
group.write_frame(Bytes::from("Third"));
group.close();
let recv_group = track_consumer.next_group().await.unwrap().unwrap();
let mut recv_group = recv_group;
let f1 = recv_group.read_frame().await.unwrap().unwrap();
assert_eq!(f1.as_ref(), b"First");
let f2 = recv_group.read_frame().await.unwrap().unwrap();
assert_eq!(f2.as_ref(), b"Second");
let f3 = recv_group.read_frame().await.unwrap().unwrap();
assert_eq!(f3.as_ref(), b"Third");
let f4 = recv_group.read_frame().await.unwrap();
assert!(f4.is_none());
println!("Successfully read all frames from group");
}
#[tokio::test]
async fn test_broadcast_unannounce() {
let origin = Origin::produce();
let broadcast = Broadcast::produce();
origin
.producer
.publish_broadcast("temp", broadcast.consumer.clone());
let mut consumer = origin.producer.consume();
let (path, bc) = consumer.announced().await.unwrap();
assert_eq!(path.as_str(), "temp");
assert!(bc.is_some());
drop(broadcast.producer);
tokio::time::sleep(Duration::from_millis(10)).await;
let result = timeout(Duration::from_millis(100), consumer.announced()).await;
if let Ok(Some((path, bc))) = result {
assert_eq!(path.as_str(), "temp");
assert!(bc.is_none(), "should be unannounced");
println!("Broadcast correctly unannounced");
}
}