use rtp::packet::Packet;
use tokio::sync::mpsc;
use tokio::time::Duration;
use util::Unmarshal;
use waitgroup::WaitGroup;
use super::*;
use crate::mock::mock_stream::MockStream;
use crate::stream_info::RTPHeaderExtension;
#[tokio::test]
async fn test_twcc_sender_interceptor() -> Result<()> {
let builder = Sender::builder().with_init_sequence_nr(0);
let icpr = builder.build("")?;
let (p_chan_tx, mut p_chan_rx) = mpsc::channel::<Packet>(10 * 5);
tokio::spawn(async move {
let wg = WaitGroup::new();
for i in 0..10 {
let w = wg.worker();
let p_chan_tx2 = p_chan_tx.clone();
let icpr2 = Arc::clone(&icpr);
tokio::spawn(async move {
let _d = w;
let stream = MockStream::new(
&StreamInfo {
rtp_header_extensions: vec![RTPHeaderExtension {
uri: TRANSPORT_CC_URI.to_owned(),
id: 1,
}],
..Default::default()
},
icpr2,
)
.await;
let id = i + 1;
#[allow(clippy::identity_op)]
for seq_num in [id * 1, id * 2, id * 3, id * 4, id * 5] {
stream
.write_rtp(&rtp::packet::Packet {
header: rtp::header::Header {
sequence_number: seq_num,
..Default::default()
},
..Default::default()
})
.await
.unwrap();
let timeout = tokio::time::sleep(Duration::from_millis(10));
tokio::pin!(timeout);
tokio::select! {
p = stream.written_rtp() =>{
if let Some(p) = p {
assert_eq!(p.header.sequence_number, seq_num);
let _ = p_chan_tx2.send(p).await;
}else{
panic!("stream.written_rtp none");
}
}
_ = timeout.as_mut()=>{
panic!("written rtp packet not found");
}
};
}
let _ = stream.close().await;
});
}
wg.wait().await;
});
while let Some(p) = p_chan_rx.recv().await {
let mut extension_header = p.header.get_extension(1).unwrap();
let _twcc = TransportCcExtension::unmarshal(&mut extension_header)?;
}
Ok(())
}