Skip to main content

gosuto_livekit/room/data_stream/
outgoing.rs

1// Copyright 2025 LiveKit, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::{
16    ByteStreamInfo, OperationType, StreamError, StreamProgress, StreamResult, TextStreamInfo,
17};
18use crate::{
19    id::ParticipantIdentity, rtc_engine::EngineError, utils::utf8_chunk::Utf8AwareChunkExt,
20};
21use bmrng::unbounded::{UnboundedRequestReceiver, UnboundedRequestSender};
22use chrono::Utc;
23use gosuto_libwebrtc::native::create_random_uuid;
24use livekit_protocol as proto;
25use std::{collections::HashMap, path::Path, sync::Arc};
26use tokio::{io::AsyncReadExt, sync::Mutex};
27
28/// Writer for an open data stream.
29pub trait StreamWriter<'a> {
30    /// Type of input this writer accepts.
31    type Input: 'a;
32
33    /// Information about the underlying data stream.
34    type Info;
35
36    /// Returns a reference to the stream info.
37    fn info(&self) -> &Self::Info;
38
39    /// Writes to the stream.
40    fn write(
41        &self,
42        input: Self::Input,
43    ) -> impl std::future::Future<Output = StreamResult<()>> + Send;
44
45    /// Closes the stream normally.
46    fn close(self) -> impl std::future::Future<Output = StreamResult<()>> + Send;
47
48    /// Closes the stream abnormally, specifying the reason for closure.
49    fn close_with_reason(
50        self,
51        reason: &str,
52    ) -> impl std::future::Future<Output = StreamResult<()>> + Send;
53}
54
55#[derive(Clone)]
56/// Writer for an open byte data stream.
57pub struct ByteStreamWriter {
58    info: Arc<ByteStreamInfo>,
59    stream: Arc<Mutex<RawStream>>,
60}
61
62#[derive(Clone)]
63/// Writer for an open text data stream.
64pub struct TextStreamWriter {
65    info: Arc<TextStreamInfo>,
66    stream: Arc<Mutex<RawStream>>,
67}
68
69impl<'a> StreamWriter<'a> for ByteStreamWriter {
70    type Input = &'a [u8];
71    type Info = ByteStreamInfo;
72
73    fn info(&self) -> &Self::Info {
74        &self.info
75    }
76
77    async fn write(&self, bytes: &'a [u8]) -> StreamResult<()> {
78        let mut stream = self.stream.lock().await;
79        for chunk in bytes.chunks(CHUNK_SIZE) {
80            stream.write_chunk(chunk).await?;
81        }
82        Ok(())
83    }
84
85    async fn close(self) -> StreamResult<()> {
86        self.stream.lock().await.close(None).await
87    }
88
89    async fn close_with_reason(self, reason: &str) -> StreamResult<()> {
90        self.stream.lock().await.close(Some(reason)).await
91    }
92}
93
94impl ByteStreamWriter {
95    /// Writes the contents of the file incrementally.
96    async fn write_file_contents(&self, path: impl AsRef<Path>) -> StreamResult<()> {
97        let mut stream = self.stream.lock().await;
98        let mut file = tokio::fs::File::open(path).await?;
99        let mut buffer = vec![0; 8192]; // 8KB
100        loop {
101            let bytes_read = file.read(&mut buffer).await?;
102            if bytes_read == 0 {
103                break;
104            }
105            stream.write_chunk(&buffer[..bytes_read]).await?;
106        }
107        Ok(())
108    }
109}
110
111impl<'a> StreamWriter<'a> for TextStreamWriter {
112    type Input = &'a str;
113    type Info = TextStreamInfo;
114
115    fn info(&self) -> &Self::Info {
116        &self.info
117    }
118
119    async fn write(&self, text: &'a str) -> StreamResult<()> {
120        let mut stream = self.stream.lock().await;
121        for chunk in text.as_bytes().utf8_aware_chunks(CHUNK_SIZE) {
122            stream.write_chunk(chunk).await?;
123        }
124        Ok(())
125    }
126
127    async fn close(self) -> StreamResult<()> {
128        self.stream.lock().await.close(None).await
129    }
130
131    async fn close_with_reason(self, reason: &str) -> StreamResult<()> {
132        self.stream.lock().await.close(Some(reason)).await
133    }
134}
135
136struct RawStreamOpenOptions {
137    header: proto::data_stream::Header,
138    destination_identities: Vec<ParticipantIdentity>,
139    packet_tx: UnboundedRequestSender<proto::DataPacket, Result<(), EngineError>>,
140}
141
142struct RawStream {
143    id: String,
144    progress: StreamProgress,
145    is_closed: bool,
146    /// Request channel for sending packets.
147    packet_tx: UnboundedRequestSender<proto::DataPacket, Result<(), EngineError>>,
148}
149
150impl RawStream {
151    async fn open(options: RawStreamOpenOptions) -> StreamResult<Self> {
152        let id = options.header.stream_id.to_string();
153        let bytes_total = options.header.total_length;
154
155        let packet = Self::create_header_packet(options.header, options.destination_identities);
156        Self::send_packet(&options.packet_tx, packet).await?;
157
158        Ok(Self {
159            id,
160            progress: StreamProgress { bytes_total, ..Default::default() },
161            is_closed: false,
162            packet_tx: options.packet_tx,
163        })
164    }
165
166    async fn write_chunk(&mut self, bytes: &[u8]) -> StreamResult<()> {
167        let packet = Self::create_chunk_packet(&self.id, self.progress.chunk_index, bytes);
168        Self::send_packet(&self.packet_tx, packet).await?;
169        self.progress.bytes_processed += bytes.len() as u64;
170        self.progress.chunk_index += 1;
171        Ok(())
172    }
173
174    async fn close(&mut self, reason: Option<&str>) -> StreamResult<()> {
175        if self.is_closed {
176            Err(StreamError::AlreadyClosed)?
177        }
178        let packet = Self::create_trailer_packet(&self.id, reason);
179        Self::send_packet(&self.packet_tx, packet).await?;
180        self.is_closed = true;
181        Ok(())
182    }
183
184    async fn send_packet(
185        tx: &UnboundedRequestSender<proto::DataPacket, Result<(), EngineError>>,
186        packet: proto::DataPacket,
187    ) -> StreamResult<()> {
188        tx.send_receive(packet)
189            .await
190            .map_err(|_| StreamError::Internal)? // request channel closed
191            .map_err(|_| StreamError::SendFailed) // data channel error
192    }
193
194    fn create_header_packet(
195        header: proto::data_stream::Header,
196        destination_identities: Vec<ParticipantIdentity>,
197    ) -> proto::DataPacket {
198        proto::DataPacket {
199            kind: proto::data_packet::Kind::Reliable.into(),
200            participant_identity: String::new(), // populate later
201            destination_identities: destination_identities.into_iter().map(|id| id.0).collect(),
202            value: Some(livekit_protocol::data_packet::Value::StreamHeader(header)),
203            // TODO: placeholder for reliable data transport
204            ..Default::default()
205        }
206    }
207
208    fn create_chunk_packet(id: &str, chunk_index: u64, content: &[u8]) -> proto::DataPacket {
209        let chunk = proto::data_stream::Chunk {
210            stream_id: id.to_string(),
211            chunk_index,
212            content: content.to_vec(),
213            ..Default::default()
214        };
215        proto::DataPacket {
216            kind: proto::data_packet::Kind::Reliable.into(),
217            participant_identity: String::new(), // populate later
218            value: Some(livekit_protocol::data_packet::Value::StreamChunk(chunk)),
219            ..Default::default()
220        }
221    }
222
223    fn create_trailer_packet(id: &str, reason: Option<&str>) -> proto::DataPacket {
224        let trailer = proto::data_stream::Trailer {
225            stream_id: id.to_string(),
226            reason: reason.unwrap_or_default().to_owned(),
227            ..Default::default()
228        };
229        proto::DataPacket {
230            kind: proto::data_packet::Kind::Reliable.into(),
231            participant_identity: String::new(), // populate later
232            value: Some(livekit_protocol::data_packet::Value::StreamTrailer(trailer)),
233            ..Default::default()
234        }
235    }
236}
237
238impl Drop for RawStream {
239    /// Close stream normally if not already closed.
240    fn drop(&mut self) {
241        if self.is_closed {
242            return;
243        }
244        let packet = Self::create_trailer_packet(&self.id, None);
245        let packet_tx = self.packet_tx.clone();
246        tokio::spawn(async move { Self::send_packet(&packet_tx, packet).await });
247    }
248}
249
250/// Options used when opening an outgoing byte data stream.
251#[derive(Clone, Default, Debug, Eq, PartialEq)]
252pub struct StreamByteOptions {
253    pub topic: String,
254    pub attributes: HashMap<String, String>,
255    pub destination_identities: Vec<ParticipantIdentity>,
256    pub id: Option<String>,
257    pub mime_type: Option<String>,
258    pub name: Option<String>,
259    pub total_length: Option<u64>,
260}
261
262/// Options used when opening an outgoing text data stream.
263#[derive(Clone, Default, Debug, Eq, PartialEq)]
264pub struct StreamTextOptions {
265    pub topic: String,
266    pub attributes: HashMap<String, String>,
267    pub destination_identities: Vec<ParticipantIdentity>,
268    pub id: Option<String>,
269    pub operation_type: Option<OperationType>,
270    pub version: Option<i32>,
271    pub reply_to_stream_id: Option<String>,
272    pub attached_stream_ids: Vec<String>,
273    pub generated: Option<bool>,
274}
275
276#[derive(Clone)]
277pub(crate) struct OutgoingStreamManager {
278    /// Request channel for sending packets.
279    packet_tx: UnboundedRequestSender<proto::DataPacket, Result<(), EngineError>>,
280}
281
282impl OutgoingStreamManager {
283    pub fn new() -> (Self, UnboundedRequestReceiver<proto::DataPacket, Result<(), EngineError>>) {
284        let (packet_tx, packet_rx) = bmrng::unbounded_channel();
285        let manager = Self { packet_tx };
286        (manager, packet_rx)
287    }
288
289    pub async fn stream_text(&self, options: StreamTextOptions) -> StreamResult<TextStreamWriter> {
290        let text_header = proto::data_stream::TextHeader {
291            operation_type: options.operation_type.unwrap_or_default() as i32,
292            version: options.version.unwrap_or_default(),
293            reply_to_stream_id: options.reply_to_stream_id.unwrap_or_default(),
294            attached_stream_ids: options.attached_stream_ids,
295            generated: options.generated.unwrap_or_default(),
296        };
297        let header = proto::data_stream::Header {
298            stream_id: options.id.unwrap_or_else(|| create_random_uuid()),
299            timestamp: Utc::now().timestamp_millis(),
300            topic: options.topic,
301            mime_type: TEXT_MIME_TYPE.to_owned(),
302            total_length: None,
303            encryption_type: proto::encryption::Type::None.into(),
304            attributes: options.attributes,
305            content_header: Some(proto::data_stream::header::ContentHeader::TextHeader(
306                text_header.clone(),
307            )),
308        };
309        let open_options = RawStreamOpenOptions {
310            header: header.clone(),
311            destination_identities: options.destination_identities,
312            packet_tx: self.packet_tx.clone(),
313        };
314        let writer = TextStreamWriter {
315            info: Arc::new(TextStreamInfo::from_headers(header, text_header)),
316            stream: Arc::new(Mutex::new(RawStream::open(open_options).await?)),
317        };
318        Ok(writer)
319    }
320
321    pub async fn stream_bytes(&self, options: StreamByteOptions) -> StreamResult<ByteStreamWriter> {
322        let byte_header = proto::data_stream::ByteHeader { name: options.name.unwrap_or_default() };
323        let header = proto::data_stream::Header {
324            stream_id: options.id.unwrap_or_else(|| create_random_uuid()),
325            timestamp: Utc::now().timestamp_millis(),
326            topic: options.topic,
327            mime_type: options.mime_type.unwrap_or_else(|| BYTE_MIME_TYPE.to_owned()),
328            total_length: options.total_length,
329            encryption_type: proto::encryption::Type::None.into(),
330            attributes: options.attributes,
331            content_header: Some(proto::data_stream::header::ContentHeader::ByteHeader(
332                byte_header.clone(),
333            )),
334        };
335
336        let open_options = RawStreamOpenOptions {
337            header: header.clone(),
338            destination_identities: options.destination_identities,
339            packet_tx: self.packet_tx.clone(),
340        };
341        let writer = ByteStreamWriter {
342            info: Arc::new(ByteStreamInfo::from_headers(header, byte_header)),
343            stream: Arc::new(Mutex::new(RawStream::open(open_options).await?)),
344        };
345        Ok(writer)
346    }
347
348    pub async fn send_text(
349        &self,
350        text: &str,
351        options: StreamTextOptions,
352    ) -> StreamResult<TextStreamInfo> {
353        let text_header = proto::data_stream::TextHeader {
354            operation_type: options.operation_type.unwrap_or_default() as i32,
355            version: options.version.unwrap_or_default(),
356            reply_to_stream_id: options.reply_to_stream_id.unwrap_or_default(),
357            attached_stream_ids: options.attached_stream_ids,
358            generated: options.generated.unwrap_or_default(),
359        };
360        let header = proto::data_stream::Header {
361            stream_id: options.id.unwrap_or_else(|| create_random_uuid()),
362            timestamp: Utc::now().timestamp_millis(),
363            topic: options.topic,
364            mime_type: TEXT_MIME_TYPE.to_owned(),
365            total_length: Some(text.bytes().len() as u64),
366            encryption_type: proto::encryption::Type::None.into(),
367            attributes: options.attributes,
368            content_header: Some(proto::data_stream::header::ContentHeader::TextHeader(
369                text_header.clone(),
370            )),
371        };
372        let open_options = RawStreamOpenOptions {
373            header: header.clone(),
374            destination_identities: options.destination_identities,
375            packet_tx: self.packet_tx.clone(),
376        };
377        let writer = TextStreamWriter {
378            info: Arc::new(TextStreamInfo::from_headers(header, text_header)),
379            stream: Arc::new(Mutex::new(RawStream::open(open_options).await?)),
380        };
381
382        let info = (*writer.info).clone();
383        writer.write(text).await?;
384        writer.close().await?;
385
386        Ok(info)
387    }
388
389    /// Send bytes to participants in the room.
390    ///
391    /// This method sends an in-memory blob of bytes to participants in the room
392    /// as a byte stream. It opens a stream using the provided options, writes the
393    /// entire buffer, and closes the stream before returning.
394    ///
395    /// The `total_length` in the header is set from the provided data and is not
396    /// overridable by `options.total_length`.
397    pub async fn send_bytes(
398        &self,
399        data: impl AsRef<[u8]>,
400        options: StreamByteOptions,
401    ) -> StreamResult<ByteStreamInfo> {
402        if options.total_length.is_some() {
403            log::warn!("Ignoring total_length option specified for send_bytes");
404        }
405        let bytes = data.as_ref();
406
407        let byte_header = proto::data_stream::ByteHeader { name: options.name.unwrap_or_default() };
408        let header = proto::data_stream::Header {
409            stream_id: options.id.unwrap_or_else(|| create_random_uuid()),
410            timestamp: Utc::now().timestamp_millis(),
411            topic: options.topic,
412            mime_type: options.mime_type.unwrap_or_else(|| BYTE_MIME_TYPE.to_owned()),
413            total_length: Some(bytes.len() as u64), // not overridable
414            encryption_type: proto::encryption::Type::None.into(),
415            attributes: options.attributes,
416            content_header: Some(proto::data_stream::header::ContentHeader::ByteHeader(
417                byte_header.clone(),
418            )),
419        };
420
421        let open_options = RawStreamOpenOptions {
422            header: header.clone(),
423            destination_identities: options.destination_identities,
424            packet_tx: self.packet_tx.clone(),
425        };
426        let writer = ByteStreamWriter {
427            info: Arc::new(ByteStreamInfo::from_headers(header, byte_header)),
428            stream: Arc::new(Mutex::new(RawStream::open(open_options).await?)),
429        };
430
431        let info = (*writer.info).clone();
432        writer.write(bytes).await?;
433        writer.close().await?;
434
435        Ok(info)
436    }
437
438    pub async fn send_file(
439        &self,
440        path: impl AsRef<Path>,
441        options: StreamByteOptions,
442    ) -> StreamResult<ByteStreamInfo> {
443        let file_size = tokio::fs::metadata(path.as_ref())
444            .await
445            .map(|metadata| metadata.len())
446            .map_err(|e| StreamError::from(e))?;
447        let name =
448            path.as_ref().file_name().and_then(|n| n.to_str()).unwrap_or_default().to_owned();
449
450        let byte_header = proto::data_stream::ByteHeader { name };
451        let header = proto::data_stream::Header {
452            stream_id: options.id.unwrap_or_else(|| create_random_uuid()),
453            timestamp: Utc::now().timestamp_millis(),
454            topic: options.topic,
455            mime_type: options.mime_type.unwrap_or_else(|| BYTE_MIME_TYPE.to_owned()),
456            total_length: Some(file_size as u64), // not overridable
457            encryption_type: proto::encryption::Type::None.into(),
458            attributes: options.attributes,
459            content_header: Some(proto::data_stream::header::ContentHeader::ByteHeader(
460                byte_header.clone(),
461            )),
462        };
463
464        let open_options = RawStreamOpenOptions {
465            header: header.clone(),
466            destination_identities: options.destination_identities,
467            packet_tx: self.packet_tx.clone(),
468        };
469        let writer = ByteStreamWriter {
470            info: Arc::new(ByteStreamInfo::from_headers(header, byte_header)),
471            stream: Arc::new(Mutex::new(RawStream::open(open_options).await?)),
472        };
473
474        let info = (*writer.info).clone();
475        writer.write_file_contents(path).await?;
476        writer.close().await?;
477
478        Ok(info)
479    }
480}
481
482/// Maximum number of bytes to send in a single chunk.
483static CHUNK_SIZE: usize = 15000;
484
485// Default MIME type to use for byte streams.
486static BYTE_MIME_TYPE: &str = "application/octet-stream";
487
488/// Default MIME type to use for text streams.
489static TEXT_MIME_TYPE: &str = "text/plain";