Skip to main content

gosuto_livekit/room/data_stream/
mod.rs

1// Copyright 2025 LiveKit, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use chrono::{DateTime, Utc};
16use livekit_protocol::{data_stream as proto, enum_dispatch};
17use std::collections::HashMap;
18use thiserror::Error;
19
20mod incoming;
21mod outgoing;
22
23pub use incoming::*;
24pub use outgoing::*;
25
26use crate::e2ee::EncryptionType;
27
28/// Result type for data stream operations.
29pub type StreamResult<T> = Result<T, StreamError>;
30
31/// Error type for data stream operations.
32#[derive(Debug, Error)]
33pub enum StreamError {
34    // TODO(ladvoc): standardize error cases and expose over FFI.
35    #[error("stream has already been closed")]
36    AlreadyClosed,
37
38    #[error("stream closed abnormally: {0}")]
39    AbnormalEnd(String),
40
41    #[error("UTF-8 decoding error: {0}")]
42    Utf8(#[from] std::string::FromUtf8Error),
43
44    #[error("incoming header was invalid")]
45    InvalidHeader,
46
47    #[error("expected chunk index to be exactly one more than the previous")]
48    MissedChunk,
49
50    #[error("read length exceeded total length specified in stream header")]
51    LengthExceeded,
52
53    #[error("stream data is incomplete")]
54    Incomplete,
55
56    #[error("unable to send packet")]
57    SendFailed,
58
59    #[error("I/O error: {0}")]
60    Io(#[from] std::io::Error),
61
62    #[error("internal error")]
63    Internal,
64
65    #[error("encryption type mismatch")]
66    EncryptionTypeMismatch,
67}
68
69/// Progress of a data stream.
70#[derive(Clone, Copy, Default, Debug, Hash, Eq, PartialEq)]
71struct StreamProgress {
72    chunk_index: u64,
73    /// Number of bytes read or written so far.
74    bytes_processed: u64,
75    /// Total number of bytes expected to be read or written for finite streams.
76    bytes_total: Option<u64>,
77}
78
79impl StreamProgress {
80    /// Returns the completion percentage for finite streams.
81    #[allow(dead_code)]
82    fn percentage(&self) -> Option<f32> {
83        self.bytes_total.map(|total| self.bytes_processed as f32 / total as f32)
84    }
85}
86
87/// Information about a byte data stream.
88#[derive(Clone, Debug)]
89pub struct ByteStreamInfo {
90    /// Unique identifier of the stream.
91    pub id: String,
92    /// Topic name used to route the stream to the appropriate handler.
93    pub topic: String,
94    /// When the stream was created.
95    pub timestamp: DateTime<Utc>,
96    /// Total expected size in bytes, if known.
97    pub total_length: Option<u64>,
98    /// Additional attributes as needed for your application.
99    pub attributes: HashMap<String, String>,
100    /// The MIME type of the stream data.
101    pub mime_type: String,
102    /// The name of the file being sent.
103    pub name: String,
104    /// The encryption used
105    pub encryption_type: EncryptionType,
106}
107
108/// Information about a text data stream.
109#[derive(Clone, Debug)]
110pub struct TextStreamInfo {
111    /// Unique identifier of the stream.
112    pub id: String,
113    /// Topic name used to route the stream to the appropriate handler.
114    pub topic: String,
115    /// When the stream was created.
116    pub timestamp: DateTime<Utc>,
117    /// Total expected size in bytes, if known.
118    pub total_length: Option<u64>,
119    /// Additional attributes as needed for your application.
120    pub attributes: HashMap<String, String>,
121    /// The MIME type of the stream data.
122    pub mime_type: String,
123    pub operation_type: OperationType,
124    pub version: i32,
125    pub reply_to_stream_id: Option<String>,
126    pub attached_stream_ids: Vec<String>,
127    pub generated: bool,
128    /// The encryption used
129    pub encryption_type: EncryptionType,
130}
131
132/// Operation type for text streams.
133#[derive(Clone, Copy, Default, Debug, Hash, Eq, PartialEq)]
134pub enum OperationType {
135    #[default]
136    Create,
137    Update,
138    Delete,
139    Reaction,
140}
141
142// MARK: - Protocol type conversion
143
144impl TryFrom<proto::Header> for AnyStreamInfo {
145    type Error = StreamError;
146
147    fn try_from(mut header: proto::Header) -> Result<Self, Self::Error> {
148        Self::try_from_with_encryption(header, EncryptionType::None)
149    }
150}
151
152impl AnyStreamInfo {
153    pub fn try_from_with_encryption(
154        mut header: proto::Header,
155        encryption_type: EncryptionType,
156    ) -> Result<Self, StreamError> {
157        let Some(content_header) = header.content_header.take() else {
158            Err(StreamError::InvalidHeader)?
159        };
160        let info = match content_header {
161            proto::header::ContentHeader::ByteHeader(byte_header) => Self::Byte(
162                ByteStreamInfo::from_headers_with_encryption(header, byte_header, encryption_type),
163            ),
164            proto::header::ContentHeader::TextHeader(text_header) => Self::Text(
165                TextStreamInfo::from_headers_with_encryption(header, text_header, encryption_type),
166            ),
167        };
168        Ok(info)
169    }
170}
171
172impl ByteStreamInfo {
173    pub(crate) fn from_headers(header: proto::Header, byte_header: proto::ByteHeader) -> Self {
174        Self::from_headers_with_encryption(header, byte_header, EncryptionType::None)
175    }
176
177    pub(crate) fn from_headers_with_encryption(
178        header: proto::Header,
179        byte_header: proto::ByteHeader,
180        encryption_type: EncryptionType,
181    ) -> Self {
182        Self {
183            id: header.stream_id,
184            topic: header.topic,
185            timestamp: DateTime::<Utc>::from_timestamp_millis(header.timestamp)
186                .unwrap_or_else(|| Utc::now()),
187            total_length: header.total_length,
188            attributes: header.attributes,
189            mime_type: header.mime_type,
190            name: byte_header.name,
191            encryption_type,
192        }
193    }
194}
195
196impl TextStreamInfo {
197    pub(crate) fn from_headers(header: proto::Header, text_header: proto::TextHeader) -> Self {
198        Self::from_headers_with_encryption(header, text_header, EncryptionType::None)
199    }
200
201    pub(crate) fn from_headers_with_encryption(
202        header: proto::Header,
203        text_header: proto::TextHeader,
204        encryption_type: EncryptionType,
205    ) -> Self {
206        Self {
207            id: header.stream_id,
208            topic: header.topic,
209            timestamp: DateTime::<Utc>::from_timestamp_millis(header.timestamp)
210                .unwrap_or_else(|| Utc::now()),
211            total_length: header.total_length,
212            attributes: header.attributes,
213            mime_type: header.mime_type,
214            operation_type: text_header.operation_type().into(),
215            version: text_header.version,
216            reply_to_stream_id: (!text_header.reply_to_stream_id.is_empty())
217                .then_some(text_header.reply_to_stream_id),
218            attached_stream_ids: text_header.attached_stream_ids,
219            generated: text_header.generated,
220            encryption_type,
221        }
222    }
223}
224
225impl From<proto::OperationType> for OperationType {
226    fn from(op_type: proto::OperationType) -> Self {
227        match op_type {
228            proto::OperationType::Create => OperationType::Create,
229            proto::OperationType::Update => OperationType::Update,
230            proto::OperationType::Delete => OperationType::Delete,
231            proto::OperationType::Reaction => OperationType::Reaction,
232        }
233    }
234}
235// MARK: - Dispatch
236
237#[derive(Clone, Debug)]
238pub(crate) enum AnyStreamInfo {
239    Byte(ByteStreamInfo),
240    Text(TextStreamInfo),
241}
242
243impl AnyStreamInfo {
244    enum_dispatch!(
245        [Byte, Text];
246        pub fn id(self: &Self) -> &str;
247        pub fn total_length(self: &Self) -> Option<u64>;
248        pub fn encryption_type(self: &Self) -> EncryptionType;
249    );
250}
251
252#[rustfmt::skip]
253macro_rules! stream_info {
254    () => {
255        fn id(&self) -> &str { &self.id }
256        fn total_length(&self) -> Option<u64> { self.total_length }
257        fn encryption_type(&self) -> EncryptionType { self.encryption_type }
258    };
259}
260
261impl ByteStreamInfo {
262    stream_info!();
263}
264
265impl TextStreamInfo {
266    stream_info!();
267}
268
269impl From<ByteStreamInfo> for AnyStreamInfo {
270    fn from(info: ByteStreamInfo) -> Self {
271        Self::Byte(info)
272    }
273}
274
275impl From<TextStreamInfo> for AnyStreamInfo {
276    fn from(info: TextStreamInfo) -> Self {
277        Self::Text(info)
278    }
279}