1use anyhow::{ensure, Result};
2use iroh_quinn_proto::coding::Codec;
3use iroh_quinn_proto::VarInt;
4use rtp::packet::Packet as RtpPacket;
5use tokio_util::bytes::BytesMut;
6use tokio_util::sync::CancellationToken;
7use tracing::debug;
8use webrtc_util::marshal::{Marshal, MarshalSize};
9
10#[derive(Debug)]
12pub struct SendStream {
13 id: VarInt,
14 stream: iroh::endpoint::SendStream,
15 cancel_token: CancellationToken,
16 sent_flow_id: bool,
20}
21
22impl SendStream {
23 pub(crate) fn new(
24 id: VarInt,
25 stream: iroh::endpoint::SendStream,
26 cancel_token: CancellationToken,
27 ) -> Self {
28 Self {
29 id,
30 stream,
31 cancel_token,
32 sent_flow_id: false,
33 }
34 }
35
36 pub fn flow_id(&self) -> VarInt {
38 self.id
39 }
40
41 pub async fn send_rtp(&mut self, packet: &RtpPacket) -> Result<()> {
43 ensure!(!self.cancel_token.is_cancelled(), "send stream is closed");
44 debug!(flow_id = %self.id, "send stream RTP packet");
45
46 let mut buf = BytesMut::new();
47
48 if !self.sent_flow_id {
49 self.id.encode(&mut buf);
51 self.sent_flow_id = true;
52 }
53
54 let marshal_size = packet.marshal_size();
56 let packet_size = VarInt::try_from(marshal_size)?;
57 packet_size.encode(&mut buf);
58
59 let existing_len = buf.len();
61 buf.resize(existing_len + marshal_size, 0);
62 let n = packet.marshal_to(&mut buf[existing_len..])?;
63 ensure!(n == marshal_size, "inconsistent packet marshal");
64
65 self.stream.write_all(&buf[..]).await?;
66
67 Ok(())
68 }
69
70 pub fn close(&self) {
72 self.cancel_token.cancel();
73 }
74
75 pub fn is_closed(&self) -> bool {
77 self.cancel_token.is_cancelled()
78 }
79}