iroh_roq/
send_stream.rs

1use anyhow::{ensure, Result};
2use iroh_quinn_proto::coding::Codec;
3use iroh_quinn_proto::VarInt;
4use rtp::packet::Packet as RtpPacket;
5use tokio_util::bytes::BytesMut;
6use tokio_util::sync::CancellationToken;
7use tracing::debug;
8use webrtc_util::marshal::{Marshal, MarshalSize};
9
10/// Manages sending on a specific send flow.
11#[derive(Debug)]
12pub struct SendStream {
13    id: VarInt,
14    stream: iroh::endpoint::SendStream,
15    cancel_token: CancellationToken,
16    /// Did we already send the flow id?
17    /// This gets sent as the very first message, but we do this lazily
18    /// when sending out the first acutal packet.
19    sent_flow_id: bool,
20}
21
22impl SendStream {
23    pub(crate) fn new(
24        id: VarInt,
25        stream: iroh::endpoint::SendStream,
26        cancel_token: CancellationToken,
27    ) -> Self {
28        Self {
29            id,
30            stream,
31            cancel_token,
32            sent_flow_id: false,
33        }
34    }
35
36    /// Returns the flow ID for this `SendStream`.
37    pub fn flow_id(&self) -> VarInt {
38        self.id
39    }
40
41    /// Send an RTP packet on this flow.
42    pub async fn send_rtp(&mut self, packet: &RtpPacket) -> Result<()> {
43        ensure!(!self.cancel_token.is_cancelled(), "send stream is closed");
44        debug!(flow_id = %self.id, "send stream RTP packet");
45
46        let mut buf = BytesMut::new();
47
48        if !self.sent_flow_id {
49            // send the flow ID for the first packet
50            self.id.encode(&mut buf);
51            self.sent_flow_id = true;
52        }
53
54        // packets are prefixed with their length as varint
55        let marshal_size = packet.marshal_size();
56        let packet_size = VarInt::try_from(marshal_size)?;
57        packet_size.encode(&mut buf);
58
59        // the actual packet
60        let existing_len = buf.len();
61        buf.resize(existing_len + marshal_size, 0);
62        let n = packet.marshal_to(&mut buf[existing_len..])?;
63        ensure!(n == marshal_size, "inconsistent packet marshal");
64
65        self.stream.write_all(&buf[..]).await?;
66
67        Ok(())
68    }
69
70    /// Close this send stream.
71    pub fn close(&self) {
72        self.cancel_token.cancel();
73    }
74
75    /// Is this send stream closed?
76    pub fn is_closed(&self) -> bool {
77        self.cancel_token.is_cancelled()
78    }
79}