#[cfg(test)]
use std::sync::Arc;
#[cfg(test)]
use bytes::Bytes;
use livekit::{ByteStreamWriter, StreamWriter, id::ParticipantIdentity};
use crate::remote_access::RemoteAccessError;
type Result<T> = std::result::Result<T, Box<RemoteAccessError>>;
pub(crate) struct Participant {
identity: ParticipantIdentity,
writer: ParticipantWriter,
}
pub(crate) struct ChannelWriter {
inner: ChannelWriterInner,
version: u32,
}
impl ChannelWriter {
pub fn new(writer: ByteStreamWriter, version: u32) -> Self {
Self {
inner: ChannelWriterInner::Livekit(writer),
version,
}
}
#[cfg(test)]
pub fn test(writer: Arc<TestChannelWriter>, version: u32) -> Self {
Self {
inner: ChannelWriterInner::Test(writer),
version,
}
}
pub fn version(&self) -> u32 {
self.version
}
pub async fn write(&self, bytes: &[u8]) -> Result<()> {
self.inner.write(bytes).await
}
}
enum ChannelWriterInner {
Livekit(ByteStreamWriter),
#[allow(dead_code)]
#[cfg(test)]
Test(Arc<TestChannelWriter>),
}
impl ChannelWriterInner {
async fn write(&self, bytes: &[u8]) -> Result<()> {
match self {
ChannelWriterInner::Livekit(stream) => stream.write(bytes).await.map_err(|e| e.into()),
#[cfg(test)]
ChannelWriterInner::Test(writer) => writer.write(bytes),
}
}
}
impl Participant {
pub fn new(identity: ParticipantIdentity, writer: ParticipantWriter) -> Self {
Self { identity, writer }
}
pub fn identity(&self) -> &ParticipantIdentity {
&self.identity
}
pub(crate) async fn send(&self, bytes: &[u8]) -> Result<()> {
self.writer.write(bytes).await
}
}
impl std::fmt::Debug for Participant {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Participant")
.field("identity", &self.identity)
.finish()
}
}
impl std::fmt::Display for Participant {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Participant({})", self.identity)
}
}
pub(crate) enum ParticipantWriter {
Livekit(ByteStreamWriter),
#[allow(dead_code)]
#[cfg(test)]
Test(Arc<TestByteStreamWriter>),
}
impl ParticipantWriter {
async fn write(&self, bytes: &[u8]) -> Result<()> {
match self {
ParticipantWriter::Livekit(stream) => stream.write(bytes).await.map_err(|e| e.into()),
#[cfg(test)]
ParticipantWriter::Test(writer) => {
writer.record(bytes);
Ok(())
}
}
}
}
#[cfg(test)]
#[derive(Default)]
pub(crate) struct TestByteStreamWriter {
writes: parking_lot::Mutex<Vec<Bytes>>,
}
#[cfg(test)]
impl TestByteStreamWriter {
fn record(&self, data: &[u8]) {
self.writes.lock().push(Bytes::copy_from_slice(data));
}
#[allow(dead_code)]
pub(crate) fn writes(&self) -> Vec<Bytes> {
std::mem::take(&mut self.writes.lock())
}
}
#[cfg(test)]
pub(crate) struct TestChannelWriter {
writes: parking_lot::Mutex<Vec<Bytes>>,
fail: std::sync::atomic::AtomicBool,
}
#[cfg(test)]
impl Default for TestChannelWriter {
fn default() -> Self {
Self {
writes: parking_lot::Mutex::new(Vec::new()),
fail: std::sync::atomic::AtomicBool::new(false),
}
}
}
#[cfg(test)]
impl TestChannelWriter {
pub fn new_failing() -> Self {
Self {
writes: parking_lot::Mutex::new(Vec::new()),
fail: std::sync::atomic::AtomicBool::new(true),
}
}
fn write(&self, data: &[u8]) -> Result<()> {
if self.fail.load(std::sync::atomic::Ordering::Relaxed) {
return Err(Box::new(RemoteAccessError::Io(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"test write failure",
))));
}
self.writes.lock().push(Bytes::copy_from_slice(data));
Ok(())
}
pub fn writes(&self) -> Vec<Bytes> {
self.writes.lock().clone()
}
}