1use super::{
16 ByteStreamInfo, OperationType, StreamError, StreamProgress, StreamResult, TextStreamInfo,
17};
18use crate::{
19 id::ParticipantIdentity, rtc_engine::EngineError, utils::utf8_chunk::Utf8AwareChunkExt,
20};
21use bmrng::unbounded::{UnboundedRequestReceiver, UnboundedRequestSender};
22use chrono::Utc;
23use gosuto_libwebrtc::native::create_random_uuid;
24use livekit_protocol as proto;
25use std::{collections::HashMap, path::Path, sync::Arc};
26use tokio::{io::AsyncReadExt, sync::Mutex};
27
28pub trait StreamWriter<'a> {
30 type Input: 'a;
32
33 type Info;
35
36 fn info(&self) -> &Self::Info;
38
39 fn write(
41 &self,
42 input: Self::Input,
43 ) -> impl std::future::Future<Output = StreamResult<()>> + Send;
44
45 fn close(self) -> impl std::future::Future<Output = StreamResult<()>> + Send;
47
48 fn close_with_reason(
50 self,
51 reason: &str,
52 ) -> impl std::future::Future<Output = StreamResult<()>> + Send;
53}
54
55#[derive(Clone)]
56pub struct ByteStreamWriter {
58 info: Arc<ByteStreamInfo>,
59 stream: Arc<Mutex<RawStream>>,
60}
61
62#[derive(Clone)]
63pub struct TextStreamWriter {
65 info: Arc<TextStreamInfo>,
66 stream: Arc<Mutex<RawStream>>,
67}
68
69impl<'a> StreamWriter<'a> for ByteStreamWriter {
70 type Input = &'a [u8];
71 type Info = ByteStreamInfo;
72
73 fn info(&self) -> &Self::Info {
74 &self.info
75 }
76
77 async fn write(&self, bytes: &'a [u8]) -> StreamResult<()> {
78 let mut stream = self.stream.lock().await;
79 for chunk in bytes.chunks(CHUNK_SIZE) {
80 stream.write_chunk(chunk).await?;
81 }
82 Ok(())
83 }
84
85 async fn close(self) -> StreamResult<()> {
86 self.stream.lock().await.close(None).await
87 }
88
89 async fn close_with_reason(self, reason: &str) -> StreamResult<()> {
90 self.stream.lock().await.close(Some(reason)).await
91 }
92}
93
94impl ByteStreamWriter {
95 async fn write_file_contents(&self, path: impl AsRef<Path>) -> StreamResult<()> {
97 let mut stream = self.stream.lock().await;
98 let mut file = tokio::fs::File::open(path).await?;
99 let mut buffer = vec![0; 8192]; loop {
101 let bytes_read = file.read(&mut buffer).await?;
102 if bytes_read == 0 {
103 break;
104 }
105 stream.write_chunk(&buffer[..bytes_read]).await?;
106 }
107 Ok(())
108 }
109}
110
111impl<'a> StreamWriter<'a> for TextStreamWriter {
112 type Input = &'a str;
113 type Info = TextStreamInfo;
114
115 fn info(&self) -> &Self::Info {
116 &self.info
117 }
118
119 async fn write(&self, text: &'a str) -> StreamResult<()> {
120 let mut stream = self.stream.lock().await;
121 for chunk in text.as_bytes().utf8_aware_chunks(CHUNK_SIZE) {
122 stream.write_chunk(chunk).await?;
123 }
124 Ok(())
125 }
126
127 async fn close(self) -> StreamResult<()> {
128 self.stream.lock().await.close(None).await
129 }
130
131 async fn close_with_reason(self, reason: &str) -> StreamResult<()> {
132 self.stream.lock().await.close(Some(reason)).await
133 }
134}
135
136struct RawStreamOpenOptions {
137 header: proto::data_stream::Header,
138 destination_identities: Vec<ParticipantIdentity>,
139 packet_tx: UnboundedRequestSender<proto::DataPacket, Result<(), EngineError>>,
140}
141
142struct RawStream {
143 id: String,
144 progress: StreamProgress,
145 is_closed: bool,
146 packet_tx: UnboundedRequestSender<proto::DataPacket, Result<(), EngineError>>,
148}
149
150impl RawStream {
151 async fn open(options: RawStreamOpenOptions) -> StreamResult<Self> {
152 let id = options.header.stream_id.to_string();
153 let bytes_total = options.header.total_length;
154
155 let packet = Self::create_header_packet(options.header, options.destination_identities);
156 Self::send_packet(&options.packet_tx, packet).await?;
157
158 Ok(Self {
159 id,
160 progress: StreamProgress { bytes_total, ..Default::default() },
161 is_closed: false,
162 packet_tx: options.packet_tx,
163 })
164 }
165
166 async fn write_chunk(&mut self, bytes: &[u8]) -> StreamResult<()> {
167 let packet = Self::create_chunk_packet(&self.id, self.progress.chunk_index, bytes);
168 Self::send_packet(&self.packet_tx, packet).await?;
169 self.progress.bytes_processed += bytes.len() as u64;
170 self.progress.chunk_index += 1;
171 Ok(())
172 }
173
174 async fn close(&mut self, reason: Option<&str>) -> StreamResult<()> {
175 if self.is_closed {
176 Err(StreamError::AlreadyClosed)?
177 }
178 let packet = Self::create_trailer_packet(&self.id, reason);
179 Self::send_packet(&self.packet_tx, packet).await?;
180 self.is_closed = true;
181 Ok(())
182 }
183
184 async fn send_packet(
185 tx: &UnboundedRequestSender<proto::DataPacket, Result<(), EngineError>>,
186 packet: proto::DataPacket,
187 ) -> StreamResult<()> {
188 tx.send_receive(packet)
189 .await
190 .map_err(|_| StreamError::Internal)? .map_err(|_| StreamError::SendFailed) }
193
194 fn create_header_packet(
195 header: proto::data_stream::Header,
196 destination_identities: Vec<ParticipantIdentity>,
197 ) -> proto::DataPacket {
198 proto::DataPacket {
199 kind: proto::data_packet::Kind::Reliable.into(),
200 participant_identity: String::new(), destination_identities: destination_identities.into_iter().map(|id| id.0).collect(),
202 value: Some(livekit_protocol::data_packet::Value::StreamHeader(header)),
203 ..Default::default()
205 }
206 }
207
208 fn create_chunk_packet(id: &str, chunk_index: u64, content: &[u8]) -> proto::DataPacket {
209 let chunk = proto::data_stream::Chunk {
210 stream_id: id.to_string(),
211 chunk_index,
212 content: content.to_vec(),
213 ..Default::default()
214 };
215 proto::DataPacket {
216 kind: proto::data_packet::Kind::Reliable.into(),
217 participant_identity: String::new(), value: Some(livekit_protocol::data_packet::Value::StreamChunk(chunk)),
219 ..Default::default()
220 }
221 }
222
223 fn create_trailer_packet(id: &str, reason: Option<&str>) -> proto::DataPacket {
224 let trailer = proto::data_stream::Trailer {
225 stream_id: id.to_string(),
226 reason: reason.unwrap_or_default().to_owned(),
227 ..Default::default()
228 };
229 proto::DataPacket {
230 kind: proto::data_packet::Kind::Reliable.into(),
231 participant_identity: String::new(), value: Some(livekit_protocol::data_packet::Value::StreamTrailer(trailer)),
233 ..Default::default()
234 }
235 }
236}
237
238impl Drop for RawStream {
239 fn drop(&mut self) {
241 if self.is_closed {
242 return;
243 }
244 let packet = Self::create_trailer_packet(&self.id, None);
245 let packet_tx = self.packet_tx.clone();
246 tokio::spawn(async move { Self::send_packet(&packet_tx, packet).await });
247 }
248}
249
250#[derive(Clone, Default, Debug, Eq, PartialEq)]
252pub struct StreamByteOptions {
253 pub topic: String,
254 pub attributes: HashMap<String, String>,
255 pub destination_identities: Vec<ParticipantIdentity>,
256 pub id: Option<String>,
257 pub mime_type: Option<String>,
258 pub name: Option<String>,
259 pub total_length: Option<u64>,
260}
261
262#[derive(Clone, Default, Debug, Eq, PartialEq)]
264pub struct StreamTextOptions {
265 pub topic: String,
266 pub attributes: HashMap<String, String>,
267 pub destination_identities: Vec<ParticipantIdentity>,
268 pub id: Option<String>,
269 pub operation_type: Option<OperationType>,
270 pub version: Option<i32>,
271 pub reply_to_stream_id: Option<String>,
272 pub attached_stream_ids: Vec<String>,
273 pub generated: Option<bool>,
274}
275
276#[derive(Clone)]
277pub(crate) struct OutgoingStreamManager {
278 packet_tx: UnboundedRequestSender<proto::DataPacket, Result<(), EngineError>>,
280}
281
282impl OutgoingStreamManager {
283 pub fn new() -> (Self, UnboundedRequestReceiver<proto::DataPacket, Result<(), EngineError>>) {
284 let (packet_tx, packet_rx) = bmrng::unbounded_channel();
285 let manager = Self { packet_tx };
286 (manager, packet_rx)
287 }
288
289 pub async fn stream_text(&self, options: StreamTextOptions) -> StreamResult<TextStreamWriter> {
290 let text_header = proto::data_stream::TextHeader {
291 operation_type: options.operation_type.unwrap_or_default() as i32,
292 version: options.version.unwrap_or_default(),
293 reply_to_stream_id: options.reply_to_stream_id.unwrap_or_default(),
294 attached_stream_ids: options.attached_stream_ids,
295 generated: options.generated.unwrap_or_default(),
296 };
297 let header = proto::data_stream::Header {
298 stream_id: options.id.unwrap_or_else(|| create_random_uuid()),
299 timestamp: Utc::now().timestamp_millis(),
300 topic: options.topic,
301 mime_type: TEXT_MIME_TYPE.to_owned(),
302 total_length: None,
303 encryption_type: proto::encryption::Type::None.into(),
304 attributes: options.attributes,
305 content_header: Some(proto::data_stream::header::ContentHeader::TextHeader(
306 text_header.clone(),
307 )),
308 };
309 let open_options = RawStreamOpenOptions {
310 header: header.clone(),
311 destination_identities: options.destination_identities,
312 packet_tx: self.packet_tx.clone(),
313 };
314 let writer = TextStreamWriter {
315 info: Arc::new(TextStreamInfo::from_headers(header, text_header)),
316 stream: Arc::new(Mutex::new(RawStream::open(open_options).await?)),
317 };
318 Ok(writer)
319 }
320
321 pub async fn stream_bytes(&self, options: StreamByteOptions) -> StreamResult<ByteStreamWriter> {
322 let byte_header = proto::data_stream::ByteHeader { name: options.name.unwrap_or_default() };
323 let header = proto::data_stream::Header {
324 stream_id: options.id.unwrap_or_else(|| create_random_uuid()),
325 timestamp: Utc::now().timestamp_millis(),
326 topic: options.topic,
327 mime_type: options.mime_type.unwrap_or_else(|| BYTE_MIME_TYPE.to_owned()),
328 total_length: options.total_length,
329 encryption_type: proto::encryption::Type::None.into(),
330 attributes: options.attributes,
331 content_header: Some(proto::data_stream::header::ContentHeader::ByteHeader(
332 byte_header.clone(),
333 )),
334 };
335
336 let open_options = RawStreamOpenOptions {
337 header: header.clone(),
338 destination_identities: options.destination_identities,
339 packet_tx: self.packet_tx.clone(),
340 };
341 let writer = ByteStreamWriter {
342 info: Arc::new(ByteStreamInfo::from_headers(header, byte_header)),
343 stream: Arc::new(Mutex::new(RawStream::open(open_options).await?)),
344 };
345 Ok(writer)
346 }
347
348 pub async fn send_text(
349 &self,
350 text: &str,
351 options: StreamTextOptions,
352 ) -> StreamResult<TextStreamInfo> {
353 let text_header = proto::data_stream::TextHeader {
354 operation_type: options.operation_type.unwrap_or_default() as i32,
355 version: options.version.unwrap_or_default(),
356 reply_to_stream_id: options.reply_to_stream_id.unwrap_or_default(),
357 attached_stream_ids: options.attached_stream_ids,
358 generated: options.generated.unwrap_or_default(),
359 };
360 let header = proto::data_stream::Header {
361 stream_id: options.id.unwrap_or_else(|| create_random_uuid()),
362 timestamp: Utc::now().timestamp_millis(),
363 topic: options.topic,
364 mime_type: TEXT_MIME_TYPE.to_owned(),
365 total_length: Some(text.bytes().len() as u64),
366 encryption_type: proto::encryption::Type::None.into(),
367 attributes: options.attributes,
368 content_header: Some(proto::data_stream::header::ContentHeader::TextHeader(
369 text_header.clone(),
370 )),
371 };
372 let open_options = RawStreamOpenOptions {
373 header: header.clone(),
374 destination_identities: options.destination_identities,
375 packet_tx: self.packet_tx.clone(),
376 };
377 let writer = TextStreamWriter {
378 info: Arc::new(TextStreamInfo::from_headers(header, text_header)),
379 stream: Arc::new(Mutex::new(RawStream::open(open_options).await?)),
380 };
381
382 let info = (*writer.info).clone();
383 writer.write(text).await?;
384 writer.close().await?;
385
386 Ok(info)
387 }
388
389 pub async fn send_bytes(
398 &self,
399 data: impl AsRef<[u8]>,
400 options: StreamByteOptions,
401 ) -> StreamResult<ByteStreamInfo> {
402 if options.total_length.is_some() {
403 log::warn!("Ignoring total_length option specified for send_bytes");
404 }
405 let bytes = data.as_ref();
406
407 let byte_header = proto::data_stream::ByteHeader { name: options.name.unwrap_or_default() };
408 let header = proto::data_stream::Header {
409 stream_id: options.id.unwrap_or_else(|| create_random_uuid()),
410 timestamp: Utc::now().timestamp_millis(),
411 topic: options.topic,
412 mime_type: options.mime_type.unwrap_or_else(|| BYTE_MIME_TYPE.to_owned()),
413 total_length: Some(bytes.len() as u64), encryption_type: proto::encryption::Type::None.into(),
415 attributes: options.attributes,
416 content_header: Some(proto::data_stream::header::ContentHeader::ByteHeader(
417 byte_header.clone(),
418 )),
419 };
420
421 let open_options = RawStreamOpenOptions {
422 header: header.clone(),
423 destination_identities: options.destination_identities,
424 packet_tx: self.packet_tx.clone(),
425 };
426 let writer = ByteStreamWriter {
427 info: Arc::new(ByteStreamInfo::from_headers(header, byte_header)),
428 stream: Arc::new(Mutex::new(RawStream::open(open_options).await?)),
429 };
430
431 let info = (*writer.info).clone();
432 writer.write(bytes).await?;
433 writer.close().await?;
434
435 Ok(info)
436 }
437
438 pub async fn send_file(
439 &self,
440 path: impl AsRef<Path>,
441 options: StreamByteOptions,
442 ) -> StreamResult<ByteStreamInfo> {
443 let file_size = tokio::fs::metadata(path.as_ref())
444 .await
445 .map(|metadata| metadata.len())
446 .map_err(|e| StreamError::from(e))?;
447 let name =
448 path.as_ref().file_name().and_then(|n| n.to_str()).unwrap_or_default().to_owned();
449
450 let byte_header = proto::data_stream::ByteHeader { name };
451 let header = proto::data_stream::Header {
452 stream_id: options.id.unwrap_or_else(|| create_random_uuid()),
453 timestamp: Utc::now().timestamp_millis(),
454 topic: options.topic,
455 mime_type: options.mime_type.unwrap_or_else(|| BYTE_MIME_TYPE.to_owned()),
456 total_length: Some(file_size as u64), encryption_type: proto::encryption::Type::None.into(),
458 attributes: options.attributes,
459 content_header: Some(proto::data_stream::header::ContentHeader::ByteHeader(
460 byte_header.clone(),
461 )),
462 };
463
464 let open_options = RawStreamOpenOptions {
465 header: header.clone(),
466 destination_identities: options.destination_identities,
467 packet_tx: self.packet_tx.clone(),
468 };
469 let writer = ByteStreamWriter {
470 info: Arc::new(ByteStreamInfo::from_headers(header, byte_header)),
471 stream: Arc::new(Mutex::new(RawStream::open(open_options).await?)),
472 };
473
474 let info = (*writer.info).clone();
475 writer.write_file_contents(path).await?;
476 writer.close().await?;
477
478 Ok(info)
479 }
480}
481
482static CHUNK_SIZE: usize = 15000;
484
485static BYTE_MIME_TYPE: &str = "application/octet-stream";
487
488static TEXT_MIME_TYPE: &str = "text/plain";