use std::ffi::CString;
use std::sync::Arc;
use std::time;
use futures::channel::oneshot;
use futures::FutureExt as _;
use super::reactor::ReactorHandle;
use super::{ClientError, PlaybackSource, Result as ClientResult};
use crate::protocol;
#[derive(Clone)]
pub struct PlaybackStream(Arc<InnerPlaybackStream>);
struct InnerPlaybackStream {
handle: ReactorHandle,
info: protocol::CreatePlaybackStreamReply,
eof_notify: futures::future::Shared<oneshot::Receiver<()>>,
}
impl std::fmt::Debug for PlaybackStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("PlaybackStream")
.field(&self.0.info.channel)
.finish()
}
}
impl PlaybackStream {
pub(super) async fn new(
handle: ReactorHandle,
params: protocol::PlaybackStreamParams,
source: impl PlaybackSource,
) -> Result<Self, ClientError> {
let (eof_tx, eof_rx) = oneshot::channel();
let info = handle
.insert_playback_stream(params, source, Some(eof_tx))
.await?;
Ok(Self(Arc::new(InnerPlaybackStream {
handle,
info,
eof_notify: eof_rx.shared(),
})))
}
pub fn channel(&self) -> u32 {
self.0.info.channel
}
pub fn buffer_attr(&self) -> &protocol::stream::BufferAttr {
&self.0.info.buffer_attr
}
pub fn sample_spec(&self) -> &protocol::SampleSpec {
&self.0.info.sample_spec
}
pub fn channel_map(&self) -> &protocol::ChannelMap {
&self.0.info.channel_map
}
pub fn sink(&self) -> u32 {
self.0.info.sink_index
}
pub async fn set_name(&self, name: CString) -> ClientResult<()> {
self.0
.handle
.roundtrip_ack(protocol::Command::SetPlaybackStreamName(
protocol::SetStreamNameParams {
index: self.0.info.stream_index,
name,
},
))
.await
}
pub async fn timing_info(&self) -> ClientResult<protocol::PlaybackLatency> {
self.0
.handle
.roundtrip_reply(protocol::Command::GetPlaybackLatency(
protocol::LatencyParams {
channel: self.0.info.channel,
now: time::SystemTime::now(),
},
))
.await
}
pub async fn cork(&self) -> ClientResult<()> {
self.0
.handle
.roundtrip_ack(protocol::Command::CorkPlaybackStream(
protocol::CorkStreamParams {
channel: self.0.info.channel,
cork: true,
},
))
.await
}
pub async fn uncork(&self) -> ClientResult<()> {
self.0
.handle
.roundtrip_ack(protocol::Command::CorkPlaybackStream(
protocol::CorkStreamParams {
channel: self.0.info.channel,
cork: false,
},
))
.await
}
pub async fn source_eof(&self) -> ClientResult<()> {
self.0
.eof_notify
.clone()
.await
.map_err(|_| ClientError::Disconnected)
}
pub async fn play_all(&self) -> ClientResult<()> {
self.source_eof().await?;
self.drain().await?;
Ok(())
}
pub async fn drain(&self) -> ClientResult<()> {
self.0
.handle
.mark_playback_stream_draining(self.0.info.channel);
self.0
.handle
.roundtrip_ack(protocol::Command::DrainPlaybackStream(self.0.info.channel))
.await
}
pub async fn flush(&self) -> super::Result<()> {
self.0
.handle
.roundtrip_ack(protocol::Command::FlushPlaybackStream(self.0.info.channel))
.await
}
pub async fn delete(self) -> ClientResult<()> {
self.0
.handle
.delete_playback_stream(self.0.info.channel)
.await
}
}
impl Drop for InnerPlaybackStream {
fn drop(&mut self) {
let _ = self
.handle
.delete_playback_stream(self.info.channel)
.now_or_never();
}
}