gosuto_livekit/room/data_stream/
mod.rs1use chrono::{DateTime, Utc};
16use livekit_protocol::{data_stream as proto, enum_dispatch};
17use std::collections::HashMap;
18use thiserror::Error;
19
20mod incoming;
21mod outgoing;
22
23pub use incoming::*;
24pub use outgoing::*;
25
26use crate::e2ee::EncryptionType;
27
28pub type StreamResult<T> = Result<T, StreamError>;
30
31#[derive(Debug, Error)]
33pub enum StreamError {
34 #[error("stream has already been closed")]
36 AlreadyClosed,
37
38 #[error("stream closed abnormally: {0}")]
39 AbnormalEnd(String),
40
41 #[error("UTF-8 decoding error: {0}")]
42 Utf8(#[from] std::string::FromUtf8Error),
43
44 #[error("incoming header was invalid")]
45 InvalidHeader,
46
47 #[error("expected chunk index to be exactly one more than the previous")]
48 MissedChunk,
49
50 #[error("read length exceeded total length specified in stream header")]
51 LengthExceeded,
52
53 #[error("stream data is incomplete")]
54 Incomplete,
55
56 #[error("unable to send packet")]
57 SendFailed,
58
59 #[error("I/O error: {0}")]
60 Io(#[from] std::io::Error),
61
62 #[error("internal error")]
63 Internal,
64
65 #[error("encryption type mismatch")]
66 EncryptionTypeMismatch,
67}
68
69#[derive(Clone, Copy, Default, Debug, Hash, Eq, PartialEq)]
71struct StreamProgress {
72 chunk_index: u64,
73 bytes_processed: u64,
75 bytes_total: Option<u64>,
77}
78
79impl StreamProgress {
80 #[allow(dead_code)]
82 fn percentage(&self) -> Option<f32> {
83 self.bytes_total.map(|total| self.bytes_processed as f32 / total as f32)
84 }
85}
86
87#[derive(Clone, Debug)]
89pub struct ByteStreamInfo {
90 pub id: String,
92 pub topic: String,
94 pub timestamp: DateTime<Utc>,
96 pub total_length: Option<u64>,
98 pub attributes: HashMap<String, String>,
100 pub mime_type: String,
102 pub name: String,
104 pub encryption_type: EncryptionType,
106}
107
108#[derive(Clone, Debug)]
110pub struct TextStreamInfo {
111 pub id: String,
113 pub topic: String,
115 pub timestamp: DateTime<Utc>,
117 pub total_length: Option<u64>,
119 pub attributes: HashMap<String, String>,
121 pub mime_type: String,
123 pub operation_type: OperationType,
124 pub version: i32,
125 pub reply_to_stream_id: Option<String>,
126 pub attached_stream_ids: Vec<String>,
127 pub generated: bool,
128 pub encryption_type: EncryptionType,
130}
131
132#[derive(Clone, Copy, Default, Debug, Hash, Eq, PartialEq)]
134pub enum OperationType {
135 #[default]
136 Create,
137 Update,
138 Delete,
139 Reaction,
140}
141
142impl TryFrom<proto::Header> for AnyStreamInfo {
145 type Error = StreamError;
146
147 fn try_from(mut header: proto::Header) -> Result<Self, Self::Error> {
148 Self::try_from_with_encryption(header, EncryptionType::None)
149 }
150}
151
152impl AnyStreamInfo {
153 pub fn try_from_with_encryption(
154 mut header: proto::Header,
155 encryption_type: EncryptionType,
156 ) -> Result<Self, StreamError> {
157 let Some(content_header) = header.content_header.take() else {
158 Err(StreamError::InvalidHeader)?
159 };
160 let info = match content_header {
161 proto::header::ContentHeader::ByteHeader(byte_header) => Self::Byte(
162 ByteStreamInfo::from_headers_with_encryption(header, byte_header, encryption_type),
163 ),
164 proto::header::ContentHeader::TextHeader(text_header) => Self::Text(
165 TextStreamInfo::from_headers_with_encryption(header, text_header, encryption_type),
166 ),
167 };
168 Ok(info)
169 }
170}
171
172impl ByteStreamInfo {
173 pub(crate) fn from_headers(header: proto::Header, byte_header: proto::ByteHeader) -> Self {
174 Self::from_headers_with_encryption(header, byte_header, EncryptionType::None)
175 }
176
177 pub(crate) fn from_headers_with_encryption(
178 header: proto::Header,
179 byte_header: proto::ByteHeader,
180 encryption_type: EncryptionType,
181 ) -> Self {
182 Self {
183 id: header.stream_id,
184 topic: header.topic,
185 timestamp: DateTime::<Utc>::from_timestamp_millis(header.timestamp)
186 .unwrap_or_else(|| Utc::now()),
187 total_length: header.total_length,
188 attributes: header.attributes,
189 mime_type: header.mime_type,
190 name: byte_header.name,
191 encryption_type,
192 }
193 }
194}
195
196impl TextStreamInfo {
197 pub(crate) fn from_headers(header: proto::Header, text_header: proto::TextHeader) -> Self {
198 Self::from_headers_with_encryption(header, text_header, EncryptionType::None)
199 }
200
201 pub(crate) fn from_headers_with_encryption(
202 header: proto::Header,
203 text_header: proto::TextHeader,
204 encryption_type: EncryptionType,
205 ) -> Self {
206 Self {
207 id: header.stream_id,
208 topic: header.topic,
209 timestamp: DateTime::<Utc>::from_timestamp_millis(header.timestamp)
210 .unwrap_or_else(|| Utc::now()),
211 total_length: header.total_length,
212 attributes: header.attributes,
213 mime_type: header.mime_type,
214 operation_type: text_header.operation_type().into(),
215 version: text_header.version,
216 reply_to_stream_id: (!text_header.reply_to_stream_id.is_empty())
217 .then_some(text_header.reply_to_stream_id),
218 attached_stream_ids: text_header.attached_stream_ids,
219 generated: text_header.generated,
220 encryption_type,
221 }
222 }
223}
224
225impl From<proto::OperationType> for OperationType {
226 fn from(op_type: proto::OperationType) -> Self {
227 match op_type {
228 proto::OperationType::Create => OperationType::Create,
229 proto::OperationType::Update => OperationType::Update,
230 proto::OperationType::Delete => OperationType::Delete,
231 proto::OperationType::Reaction => OperationType::Reaction,
232 }
233 }
234}
235#[derive(Clone, Debug)]
238pub(crate) enum AnyStreamInfo {
239 Byte(ByteStreamInfo),
240 Text(TextStreamInfo),
241}
242
243impl AnyStreamInfo {
244 enum_dispatch!(
245 [Byte, Text];
246 pub fn id(self: &Self) -> &str;
247 pub fn total_length(self: &Self) -> Option<u64>;
248 pub fn encryption_type(self: &Self) -> EncryptionType;
249 );
250}
251
252#[rustfmt::skip]
253macro_rules! stream_info {
254 () => {
255 fn id(&self) -> &str { &self.id }
256 fn total_length(&self) -> Option<u64> { self.total_length }
257 fn encryption_type(&self) -> EncryptionType { self.encryption_type }
258 };
259}
260
261impl ByteStreamInfo {
262 stream_info!();
263}
264
265impl TextStreamInfo {
266 stream_info!();
267}
268
269impl From<ByteStreamInfo> for AnyStreamInfo {
270 fn from(info: ByteStreamInfo) -> Self {
271 Self::Byte(info)
272 }
273}
274
275impl From<TextStreamInfo> for AnyStreamInfo {
276 fn from(info: TextStreamInfo) -> Self {
277 Self::Text(info)
278 }
279}