iroh_roq/
send_flow.rs

1use anyhow::{ensure, Result};
2use iroh::endpoint::Connection;
3use iroh_quinn_proto::coding::Codec;
4use iroh_quinn_proto::VarInt;
5use rtp::packet::Packet as RtpPacket;
6use tokio_util::bytes::BytesMut;
7use tokio_util::sync::CancellationToken;
8use tracing::debug;
9use webrtc_util::marshal::{Marshal, MarshalSize};
10
11use crate::SendStream;
12
13/// The sending side of an RTP flow.
14#[derive(Clone, Debug)]
15pub struct SendFlow {
16    id: VarInt,
17    conn: Connection,
18    cancel_token: CancellationToken,
19}
20
21impl SendFlow {
22    pub(crate) fn new(conn: Connection, id: VarInt, cancel_token: CancellationToken) -> Self {
23        Self {
24            id,
25            conn,
26            cancel_token,
27        }
28    }
29
30    /// Returns the flow ID for this `SendFlow`.
31    pub fn flow_id(&self) -> VarInt {
32        self.id
33    }
34
35    /// Send the given RTP packet.
36    ///
37    /// This will use the datagram based path.
38    pub fn send_rtp(&self, packet: &RtpPacket) -> Result<()> {
39        ensure!(!self.cancel_token.is_cancelled(), "flow is closed");
40
41        debug!(flow_id = %self.id, "send datagram RTP packet");
42
43        let mut buf = BytesMut::new();
44        self.id.encode(&mut buf);
45        let marshal_size = packet.marshal_size();
46        let id_len = buf.len();
47        buf.resize(id_len + marshal_size, 0);
48        let n = packet.marshal_to(&mut buf[id_len..])?;
49        ensure!(n == marshal_size, "inconsistent packet marshal");
50
51        self.conn.send_datagram(buf.freeze())?;
52
53        Ok(())
54    }
55
56    /// Creates a new `SendStream`.
57    ///
58    /// This will use the stream based path.
59    pub async fn new_send_stream(&self) -> Result<SendStream> {
60        ensure!(!self.cancel_token.is_cancelled(), "flow is closed");
61
62        let stream = self.conn.open_uni().await?;
63
64        debug!(flow_id = %self.id, "opened send stream");
65
66        Ok(SendStream::new(
67            self.id,
68            stream,
69            self.cancel_token.child_token(),
70        ))
71    }
72
73    /// Close this flow
74    pub fn close(&self) {
75        self.cancel_token.cancel();
76    }
77
78    /// Is this flow closed?
79    pub fn is_closed(&self) -> bool {
80        self.cancel_token.is_cancelled()
81    }
82}