use std::borrow::Cow;
use std::str::FromStr;
use std::string::ToString;
use std::io::{Read, Write};
use std::rc::Rc;
use serde_json;
use cast::proxies;
use errors::Error;
use message_manager::{CastMessage, CastMessagePayload, MessageManager};
const CHANNEL_NAMESPACE: &str = "urn:x-cast:com.google.cast.media";
const MESSAGE_TYPE_GET_STATUS: &str = "GET_STATUS";
const MESSAGE_TYPE_LOAD: &str = "LOAD";
const MESSAGE_TYPE_PLAY: &str = "PLAY";
const MESSAGE_TYPE_PAUSE: &str = "PAUSE";
const MESSAGE_TYPE_STOP: &str = "STOP";
const MESSAGE_TYPE_SEEK: &str = "SEEK";
const MESSAGE_TYPE_MEDIA_STATUS: &str = "MEDIA_STATUS";
const MESSAGE_TYPE_LOAD_CANCELLED: &str = "LOAD_CANCELLED";
const MESSAGE_TYPE_LOAD_FAILED: &str = "LOAD_FAILED";
const MESSAGE_TYPE_INVALID_PLAYER_STATE: &str = "INVALID_PLAYER_STATE";
const MESSAGE_TYPE_INVALID_REQUEST: &str = "INVALID_REQUEST";
#[derive(Debug)]
pub enum StreamType {
None,
Buffered,
Live,
}
impl FromStr for StreamType {
type Err = Error;
fn from_str(s: &str) -> Result<StreamType, Error> {
match s {
"BUFFERED" | "buffered" => Ok(StreamType::Buffered),
"LIVE" | "live" => Ok(StreamType::Live),
_ => Ok(StreamType::None),
}
}
}
impl ToString for StreamType {
fn to_string(&self) -> String {
let stream_type = match *self {
StreamType::None => "NONE",
StreamType::Buffered => "BUFFERED",
StreamType::Live => "LIVE",
};
stream_type.to_string()
}
}
#[derive(Debug)]
pub enum PlayerState {
Idle,
Playing,
Buffering,
Paused,
}
impl FromStr for PlayerState {
type Err = Error;
fn from_str(s: &str) -> Result<PlayerState, Error> {
match s {
"IDLE" => Ok(PlayerState::Idle),
"PLAYING" => Ok(PlayerState::Playing),
"BUFFERING" => Ok(PlayerState::Buffering),
"PAUSED" => Ok(PlayerState::Paused),
_ => Err(Error::Internal(format!("Unknown player state {}", s))),
}
}
}
impl ToString for PlayerState {
fn to_string(&self) -> String {
let player_state = match *self {
PlayerState::Idle => "IDLE",
PlayerState::Playing => "PLAYING",
PlayerState::Buffering => "BUFFERING",
PlayerState::Paused => "PAUSED",
};
player_state.to_string()
}
}
#[derive(Debug)]
pub enum IdleReason {
Cancelled,
Interrupted,
Finished,
Error,
}
impl FromStr for IdleReason {
type Err = Error;
fn from_str(s: &str) -> Result<IdleReason, Error> {
match s {
"CANCELLED" => Ok(IdleReason::Cancelled),
"INTERRUPTED" => Ok(IdleReason::Interrupted),
"FINISHED" => Ok(IdleReason::Finished),
"ERROR" => Ok(IdleReason::Error),
_ => Err(Error::Internal(format!("Unknown idle reason {}", s))),
}
}
}
#[derive(Debug)]
pub enum ResumeState {
PlaybackStart,
PlaybackPause,
}
impl FromStr for ResumeState {
type Err = Error;
fn from_str(s: &str) -> Result<ResumeState, Error> {
match s {
"PLAYBACK_START" | "start" => Ok(ResumeState::PlaybackStart),
"PLAYBACK_PAUSE" | "pause" => Ok(ResumeState::PlaybackPause),
_ => Err(Error::Internal(format!("Unknown resume state {}", s))),
}
}
}
impl ToString for ResumeState {
fn to_string(&self) -> String {
let resume_state = match *self {
ResumeState::PlaybackStart => "PLAYBACK_START",
ResumeState::PlaybackPause => "PLAYBACK_PAUSE",
};
resume_state.to_string()
}
}
#[derive(Debug)]
pub struct Media {
pub content_id: String,
pub stream_type: StreamType,
pub content_type: String,
pub duration: Option<f32>,
}
#[derive(Debug)]
pub struct Status {
pub request_id: i32,
pub entries: Vec<StatusEntry>,
}
#[derive(Debug)]
pub struct StatusEntry {
pub media_session_id: i32,
pub media: Option<Media>,
pub playback_rate: f32,
pub player_state: PlayerState,
pub idle_reason: Option<IdleReason>,
pub current_time: Option<f32>,
pub supported_media_commands: u8,
}
#[derive(Debug)]
pub struct LoadCancelled {
pub request_id: i32,
}
#[derive(Debug)]
pub struct LoadFailed {
pub request_id: i32,
}
#[derive(Debug)]
pub struct InvalidPlayerState {
pub request_id: i32,
}
#[derive(Debug)]
pub struct InvalidRequest {
pub request_id: i32,
pub reason: Option<String>,
}
#[derive(Debug)]
pub enum MediaResponse {
Status(Status),
LoadCancelled(LoadCancelled),
LoadFailed(LoadFailed),
InvalidPlayerState(InvalidPlayerState),
InvalidRequest(InvalidRequest),
NotImplemented(String, serde_json::Value),
}
pub struct MediaChannel<'a, W>
where
W: Read + Write,
{
sender: Cow<'a, str>,
message_manager: Rc<MessageManager<W>>,
}
impl<'a, W> MediaChannel<'a, W>
where
W: Read + Write,
{
pub fn new<S>(sender: S, message_manager: Rc<MessageManager<W>>) -> MediaChannel<'a, W>
where
S: Into<Cow<'a, str>>,
{
MediaChannel {
sender: sender.into(),
message_manager: message_manager,
}
}
pub fn get_status<S>(
&self,
destination: S,
media_session_id: Option<i32>,
) -> Result<Status, Error>
where
S: Into<Cow<'a, str>>,
{
let request_id = self.message_manager.generate_request_id();
let payload = serde_json::to_string(&proxies::media::GetStatusRequest {
typ: MESSAGE_TYPE_GET_STATUS.to_string(),
request_id: request_id,
media_session_id: media_session_id,
})?;
self.message_manager.send(CastMessage {
namespace: CHANNEL_NAMESPACE.to_string(),
source: self.sender.to_string(),
destination: destination.into().to_string(),
payload: CastMessagePayload::String(payload),
})?;
self.message_manager.receive_find_map(|message| {
if !self.can_handle(message) {
return Ok(None);
}
match self.parse(message)? {
MediaResponse::Status(status) => if status.request_id == request_id {
return Ok(Some(status));
},
MediaResponse::InvalidRequest(error) => if error.request_id == request_id {
return Err(Error::Internal(format!(
"Invalid request ({}).",
error.reason.unwrap_or_else(|| "Unknown".to_string())
)));
},
_ => {}
}
Ok(None)
})
}
pub fn load<S>(&self, destination: S, session_id: S, media: &Media) -> Result<Status, Error>
where
S: Into<Cow<'a, str>>,
{
let request_id = self.message_manager.generate_request_id();
let payload = serde_json::to_string(&proxies::media::MediaRequest {
request_id: request_id,
session_id: session_id.into().to_string(),
typ: MESSAGE_TYPE_LOAD.to_string(),
media: proxies::media::Media {
content_id: media.content_id.clone(),
stream_type: media.stream_type.to_string(),
content_type: media.content_type.clone(),
duration: media.duration,
},
current_time: 0_f64,
autoplay: true,
custom_data: proxies::media::CustomData::new(),
})?;
self.message_manager.send(CastMessage {
namespace: CHANNEL_NAMESPACE.to_string(),
source: self.sender.to_string(),
destination: destination.into().to_string(),
payload: CastMessagePayload::String(payload),
})?;
self.message_manager.receive_find_map(|message| {
if !self.can_handle(message) {
return Ok(None);
}
match self.parse(message)? {
MediaResponse::Status(status) => {
if status.request_id == request_id {
return Ok(Some(status));
}
let has_media = {
status.entries.iter().any(|entry| {
if let Some(ref loaded_media) = entry.media {
return loaded_media.content_id == media.content_id;
}
false
})
};
if has_media {
return Ok(Some(status));
}
}
MediaResponse::LoadFailed(error) => if error.request_id == request_id {
return Err(Error::Internal("Failed to load media.".to_string()));
},
MediaResponse::LoadCancelled(error) => if error.request_id == request_id {
return Err(Error::Internal(
"Load cancelled by another request.".to_string(),
));
},
MediaResponse::InvalidPlayerState(error) => if error.request_id == request_id {
return Err(Error::Internal(
"Load failed because of invalid player state.".to_string(),
));
},
_ => {}
}
Ok(None)
})
}
pub fn pause<S>(&self, destination: S, media_session_id: i32) -> Result<StatusEntry, Error>
where
S: Into<Cow<'a, str>>,
{
let request_id = self.message_manager.generate_request_id();
let payload = serde_json::to_string(&proxies::media::PlaybackGenericRequest {
request_id: request_id,
media_session_id: media_session_id,
typ: MESSAGE_TYPE_PAUSE.to_string(),
custom_data: proxies::media::CustomData::new(),
})?;
self.message_manager.send(CastMessage {
namespace: CHANNEL_NAMESPACE.to_string(),
source: self.sender.to_string(),
destination: destination.into().to_string(),
payload: CastMessagePayload::String(payload),
})?;
self.receive_status_entry(request_id, media_session_id)
}
pub fn play<S>(&self, destination: S, media_session_id: i32) -> Result<StatusEntry, Error>
where
S: Into<Cow<'a, str>>,
{
let request_id = self.message_manager.generate_request_id();
let payload = serde_json::to_string(&proxies::media::PlaybackGenericRequest {
request_id: request_id,
media_session_id: media_session_id,
typ: MESSAGE_TYPE_PLAY.to_string(),
custom_data: proxies::media::CustomData::new(),
})?;
self.message_manager.send(CastMessage {
namespace: CHANNEL_NAMESPACE.to_string(),
source: self.sender.to_string(),
destination: destination.into().to_string(),
payload: CastMessagePayload::String(payload),
})?;
self.receive_status_entry(request_id, media_session_id)
}
pub fn stop<S>(&self, destination: S, media_session_id: i32) -> Result<StatusEntry, Error>
where
S: Into<Cow<'a, str>>,
{
let request_id = self.message_manager.generate_request_id();
let payload = serde_json::to_string(&proxies::media::PlaybackGenericRequest {
request_id: request_id,
media_session_id: media_session_id,
typ: MESSAGE_TYPE_STOP.to_string(),
custom_data: proxies::media::CustomData::new(),
})?;
self.message_manager.send(CastMessage {
namespace: CHANNEL_NAMESPACE.to_string(),
source: self.sender.to_string(),
destination: destination.into().to_string(),
payload: CastMessagePayload::String(payload),
})?;
self.receive_status_entry(request_id, media_session_id)
}
pub fn seek<S>(
&self,
destination: S,
media_session_id: i32,
current_time: Option<f32>,
resume_state: Option<ResumeState>,
) -> Result<StatusEntry, Error>
where
S: Into<Cow<'a, str>>,
{
let request_id = self.message_manager.generate_request_id();
let payload = serde_json::to_string(&proxies::media::PlaybackSeekRequest {
request_id: request_id,
media_session_id: media_session_id,
typ: MESSAGE_TYPE_SEEK.to_string(),
current_time: current_time,
resume_state: resume_state.map(|s| s.to_string()),
custom_data: proxies::media::CustomData::new(),
})?;
self.message_manager.send(CastMessage {
namespace: CHANNEL_NAMESPACE.to_string(),
source: self.sender.to_string(),
destination: destination.into().to_string(),
payload: CastMessagePayload::String(payload),
})?;
self.receive_status_entry(request_id, media_session_id)
}
pub fn can_handle(&self, message: &CastMessage) -> bool {
message.namespace == CHANNEL_NAMESPACE
}
pub fn parse(&self, message: &CastMessage) -> Result<MediaResponse, Error> {
let reply = match message.payload {
CastMessagePayload::String(ref payload) => {
serde_json::from_str::<serde_json::Value>(payload)?
}
_ => {
return Err(Error::Internal(
"Binary payload is not supported!".to_string(),
))
}
};
let message_type = reply
.as_object()
.and_then(|object| object.get("type"))
.and_then(|property| property.as_str())
.unwrap_or("")
.to_string();
let response = match message_type.as_ref() {
MESSAGE_TYPE_MEDIA_STATUS => {
let reply: proxies::media::StatusReply = serde_json::value::from_value(reply)?;
let statuses_entries = reply.status.iter().map(|x| {
StatusEntry {
media_session_id: x.media_session_id,
media: x.media.as_ref().map(|m| {
Media {
content_id: m.content_id.to_string(),
stream_type: StreamType::from_str(m.stream_type.as_ref()).unwrap(),
content_type: m.content_type.to_string(),
duration: m.duration,
}
}),
playback_rate: x.playback_rate,
player_state: PlayerState::from_str(x.player_state.as_ref()).unwrap(),
idle_reason: x.idle_reason
.as_ref()
.map(|reason| IdleReason::from_str(reason).unwrap()),
current_time: x.current_time,
supported_media_commands: x.supported_media_commands,
}
});
MediaResponse::Status(Status {
request_id: reply.request_id,
entries: statuses_entries.collect::<Vec<StatusEntry>>(),
})
}
MESSAGE_TYPE_LOAD_CANCELLED => {
let reply: proxies::media::LoadCancelledReply =
serde_json::value::from_value(reply)?;
MediaResponse::LoadCancelled(LoadCancelled {
request_id: reply.request_id,
})
}
MESSAGE_TYPE_LOAD_FAILED => {
let reply: proxies::media::LoadFailedReply = serde_json::value::from_value(reply)?;
MediaResponse::LoadFailed(LoadFailed {
request_id: reply.request_id,
})
}
MESSAGE_TYPE_INVALID_PLAYER_STATE => {
let reply: proxies::media::InvalidPlayerStateReply =
serde_json::value::from_value(reply)?;
MediaResponse::InvalidPlayerState(InvalidPlayerState {
request_id: reply.request_id,
})
}
MESSAGE_TYPE_INVALID_REQUEST => {
let reply: proxies::media::InvalidRequestReply =
serde_json::value::from_value(reply)?;
MediaResponse::InvalidRequest(InvalidRequest {
request_id: reply.request_id,
reason: reply.reason,
})
}
_ => MediaResponse::NotImplemented(message_type.to_string(), reply),
};
Ok(response)
}
fn receive_status_entry(
&self,
request_id: i32,
media_session_id: i32,
) -> Result<StatusEntry, Error> {
self.message_manager.receive_find_map(|message| {
if !self.can_handle(message) {
return Ok(None);
}
match self.parse(message)? {
MediaResponse::Status(mut status) => if status.request_id == request_id {
let position = status
.entries
.iter()
.position(|e| e.media_session_id == media_session_id);
return Ok(position.and_then(|position| Some(status.entries.remove(position))));
},
MediaResponse::InvalidPlayerState(error) => if error.request_id == request_id {
return Err(Error::Internal(
"Request failed because of invalid player state.".to_string(),
));
},
MediaResponse::InvalidRequest(error) => if error.request_id == request_id {
return Err(Error::Internal(format!(
"Invalid request ({}).",
error.reason.unwrap_or_else(|| "Unknown".to_string())
)));
},
_ => {}
}
Ok(None)
})
}
}