1use anyhow::{ensure, Result};
2use iroh::endpoint::Connection;
3use iroh_quinn_proto::coding::Codec;
4use iroh_quinn_proto::VarInt;
5use rtp::packet::Packet as RtpPacket;
6use tokio_util::bytes::BytesMut;
7use tokio_util::sync::CancellationToken;
8use tracing::debug;
9use webrtc_util::marshal::{Marshal, MarshalSize};
10
11use crate::SendStream;
12
13#[derive(Clone, Debug)]
15pub struct SendFlow {
16 id: VarInt,
17 conn: Connection,
18 cancel_token: CancellationToken,
19}
20
21impl SendFlow {
22 pub(crate) fn new(conn: Connection, id: VarInt, cancel_token: CancellationToken) -> Self {
23 Self {
24 id,
25 conn,
26 cancel_token,
27 }
28 }
29
30 pub fn flow_id(&self) -> VarInt {
32 self.id
33 }
34
35 pub fn send_rtp(&self, packet: &RtpPacket) -> Result<()> {
39 ensure!(!self.cancel_token.is_cancelled(), "flow is closed");
40
41 debug!(flow_id = %self.id, "send datagram RTP packet");
42
43 let mut buf = BytesMut::new();
44 self.id.encode(&mut buf);
45 let marshal_size = packet.marshal_size();
46 let id_len = buf.len();
47 buf.resize(id_len + marshal_size, 0);
48 let n = packet.marshal_to(&mut buf[id_len..])?;
49 ensure!(n == marshal_size, "inconsistent packet marshal");
50
51 self.conn.send_datagram(buf.freeze())?;
52
53 Ok(())
54 }
55
56 pub async fn new_send_stream(&self) -> Result<SendStream> {
60 ensure!(!self.cancel_token.is_cancelled(), "flow is closed");
61
62 let stream = self.conn.open_uni().await?;
63
64 debug!(flow_id = %self.id, "opened send stream");
65
66 Ok(SendStream::new(
67 self.id,
68 stream,
69 self.cancel_token.child_token(),
70 ))
71 }
72
73 pub fn close(&self) {
75 self.cancel_token.cancel();
76 }
77
78 pub fn is_closed(&self) -> bool {
80 self.cancel_token.is_cancelled()
81 }
82}