#![deny(missing_docs)]
#![deny(warnings)]
#![deny(unsafe_code)]
#![doc = tx5_core::__doc_header!()]
#[cfg(any(
not(any(feature = "backend-go-pion", feature = "backend-webrtc-rs")),
all(feature = "backend-go-pion", feature = "backend-webrtc-rs"),
))]
compile_error!("Must specify exactly 1 webrtc backend");
pub mod deps {
pub use tx5_core;
pub use tx5_core::deps::*;
pub use tx5_signal;
pub use tx5_signal::deps::*;
}
use deps::{serde, serde_json};
use tx5_core::Uniq;
pub use tx5_core::{Error, ErrorExt, Id, Result, Tx5Url};
pub mod actor;
mod back_buf;
pub use back_buf::*;
pub trait BytesBufExt {
fn to_vec(self) -> Result<Vec<u8>>;
}
impl BytesBufExt for Box<dyn bytes::Buf + 'static + Send> {
fn to_vec(self) -> Result<Vec<u8>> {
use bytes::Buf;
use std::io::Read;
let mut out = Vec::with_capacity(self.remaining());
self.reader().read_to_end(&mut out)?;
Ok(out)
}
}
const FINISH: u64 = 1 << 63;
trait FinishExt: Sized {
fn set_finish(&self) -> Self;
fn unset_finish(&self) -> Self;
fn is_finish(&self) -> bool;
}
impl FinishExt for u64 {
fn set_finish(&self) -> Self {
*self | FINISH
}
fn unset_finish(&self) -> Self {
*self & !FINISH
}
fn is_finish(&self) -> bool {
*self & FINISH > 0
}
}
#[derive(Default)]
struct BytesList(pub std::collections::VecDeque<bytes::Bytes>);
impl BytesList {
pub fn new() -> Self {
Self::default()
}
pub fn push(&mut self, data: bytes::Bytes) {
if bytes::Buf::has_remaining(&data) {
self.0.push_back(data);
}
}
pub fn into_dyn(self) -> Box<dyn bytes::Buf + 'static + Send> {
Box::new(self)
}
}
impl bytes::Buf for BytesList {
fn remaining(&self) -> usize {
self.0.iter().map(|b| b.remaining()).sum()
}
fn chunk(&self) -> &[u8] {
match self.0.get(0) {
Some(b) => b.chunk(),
None => &[],
}
}
#[allow(clippy::comparison_chain)] fn advance(&mut self, mut cnt: usize) {
loop {
let mut item = match self.0.pop_front() {
Some(item) => item,
None => return,
};
let rem = item.remaining();
if rem == cnt {
return;
} else if rem < cnt {
cnt -= rem;
} else if rem > cnt {
item.advance(cnt);
self.0.push_front(item);
return;
}
}
}
}
pub mod state;
mod config;
pub use config::*;
mod endpoint;
pub use endpoint::*;
fn divide_send<B: bytes::Buf>(
config: &dyn Config,
snd_ident: &std::sync::atomic::AtomicU64,
mut data: B,
) -> Result<Vec<BackBuf>> {
use std::io::Write;
let max_send_bytes = config.max_send_bytes();
if bytes::Buf::remaining(&data) > max_send_bytes as usize {
Err(Error::id("DataTooLarge"))
} else {
(|| {
let ident =
snd_ident.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let mut buf_list = Vec::new();
const MAX_MSG: usize = (16 * 1024) - 8;
while data.has_remaining() {
let loc_len = std::cmp::min(data.remaining(), MAX_MSG);
let ident = if data.remaining() <= loc_len {
ident.set_finish()
} else {
ident.unset_finish()
};
tracing::trace!(ident=%ident.unset_finish(), is_finish=%ident.is_finish(), %loc_len, "prepare send");
let mut tmp =
bytes::Buf::reader(bytes::Buf::take(data, loc_len));
let mut buf = BackBuf::from_writer()?;
buf.write_all(&ident.to_le_bytes())?;
std::io::copy(&mut tmp, &mut buf)?;
buf_list.push(buf.finish());
data = tmp.into_inner().into_inner();
}
if buf_list.is_empty() {
let ident = ident.set_finish();
let mut buf = BackBuf::from_writer()?;
buf.write_all(&ident.to_le_bytes())?;
buf_list.push(buf.finish());
}
Ok(buf_list)
})()
}
}
#[cfg(test)]
mod test;