use std::{ffi::CString, sync::Arc, time};
use futures::{channel::oneshot, FutureExt as _};
use super::{reactor::ReactorHandle, ClientError, RecordSink, Result as ClientResult};
use crate::protocol;
#[derive(Clone)]
pub struct RecordStream(Arc<InnerRecordStream>);
struct InnerRecordStream {
handle: ReactorHandle,
info: protocol::CreateRecordStreamReply,
start_notify: futures::future::Shared<oneshot::Receiver<()>>,
}
impl std::fmt::Debug for RecordStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("RecordStream")
.field(&self.0.info.channel)
.finish()
}
}
impl RecordStream {
pub(super) async fn new(
handle: ReactorHandle,
params: protocol::RecordStreamParams,
sink: impl RecordSink,
) -> Result<Self, ClientError> {
let (start_tx, start_rx) = oneshot::channel();
let info = handle
.insert_record_stream(params, sink, Some(start_tx))
.await?;
Ok(Self(Arc::new(InnerRecordStream {
handle,
info,
start_notify: start_rx.shared(),
})))
}
pub fn channel(&self) -> u32 {
self.0.info.channel
}
pub fn buffer_attr(&self) -> &protocol::stream::BufferAttr {
&self.0.info.buffer_attr
}
pub fn sample_spec(&self) -> &protocol::SampleSpec {
&self.0.info.sample_spec
}
pub fn channel_map(&self) -> &protocol::ChannelMap {
&self.0.info.channel_map
}
pub fn sink(&self) -> u32 {
self.0.info.sink_index
}
pub async fn set_name(&self, name: CString) -> ClientResult<()> {
self.0
.handle
.roundtrip_ack(protocol::Command::SetRecordStreamName(
protocol::SetStreamNameParams {
index: self.0.info.stream_index,
name,
},
))
.await
}
pub async fn timing_info(&self) -> ClientResult<protocol::RecordLatency> {
self.0
.handle
.roundtrip_reply(protocol::Command::GetRecordLatency(
protocol::LatencyParams {
channel: self.0.info.channel,
now: time::SystemTime::now(),
},
))
.await
}
pub async fn cork(&self) -> ClientResult<()> {
self.0
.handle
.roundtrip_ack(protocol::Command::CorkRecordStream(
protocol::CorkStreamParams {
channel: self.0.info.channel,
cork: true,
},
))
.await
}
pub async fn uncork(&self) -> ClientResult<()> {
self.0
.handle
.roundtrip_ack(protocol::Command::CorkRecordStream(
protocol::CorkStreamParams {
channel: self.0.info.channel,
cork: false,
},
))
.await
}
pub async fn started(&self) -> ClientResult<()> {
self.0
.start_notify
.clone()
.await
.map_err(|_| ClientError::Disconnected)
}
pub async fn flush(&self) -> super::Result<()> {
self.0
.handle
.roundtrip_ack(protocol::Command::FlushRecordStream(self.0.info.channel))
.await
}
pub async fn delete(self) -> ClientResult<()> {
self.0
.handle
.delete_record_stream(self.0.info.channel)
.await
}
}
impl Drop for InnerRecordStream {
fn drop(&mut self) {
let _ = self
.handle
.delete_record_stream(self.info.channel)
.now_or_never();
}
}