use std::{sync::Arc, time::Instant};
use hyper::Method;
use serde_json::{Map as JsonMap, Value as JsonValue};
use crate::{
AppState, PLAYBACK_INFO_LOGGER_DOMAIN, api::PlaybackInfo,
core::frontend::types::InfuseAuthorization, debug_log, info_log,
network::HttpMethod, util::StringUtil, warn_log,
};
const SLOW_PLAYBACK_INFO_FETCH_THRESHOLD_MS: u128 = 500;
const PLAYBACK_INFO_CACHE_KEY_PREFIX: &str = "playback:info";
const PLAYBACK_INFO_METHOD_SEGMENT: &str = "method";
const PLAYBACK_INFO_ITEM_ID_SEGMENT: &str = "item_id";
const PLAYBACK_INFO_MEDIA_SOURCE_ID_SEGMENT: &str = "media_source_id";
const PLAYBACK_INFO_CONTENT_TYPE_HASH_SEGMENT: &str = "content_type_hash";
const PLAYBACK_INFO_BODY_HASH_SEGMENT: &str = "body_hash";
const PLAYBACK_INFO_ITEMS_SEGMENT: &str = "Items";
const PLAYBACK_INFO_PATH_SEGMENT: &str = "PlaybackInfo";
const PLAYBACK_INFO_MEDIA_SOURCE_ID_QUERY_KEY: &str = "MediaSourceId";
const CONTENT_TYPE_JSON: &str = "application/json";
const CONTENT_TYPE_FORM_URLENCODED: &str = "application/x-www-form-urlencoded";
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct PlaybackInfoRequest {
pub item_id: String,
pub media_source_id: String,
pub method: HttpMethod,
pub body: Option<Vec<u8>>,
pub content_type: Option<String>,
}
impl PlaybackInfoRequest {
pub fn new(
item_id: impl Into<String>,
media_source_id: impl Into<String>,
method: HttpMethod,
body: Option<Vec<u8>>,
content_type: Option<String>,
) -> Self {
Self {
item_id: item_id.into(),
media_source_id: media_source_id.into(),
method,
body,
content_type,
}
}
pub fn cache_key(&self) -> Result<String, PlaybackInfoServiceError> {
let item_id = self.item_id.trim();
if item_id.is_empty() {
return Err(PlaybackInfoServiceError::InvalidItemId);
}
let media_source_id = self.media_source_id.trim();
if media_source_id.is_empty() {
return Err(PlaybackInfoServiceError::InvalidMediaSourceId);
}
let method = self.method.to_string().to_ascii_lowercase();
let mut key = format!(
"{PLAYBACK_INFO_CACHE_KEY_PREFIX}:{PLAYBACK_INFO_METHOD_SEGMENT}:{method}:{PLAYBACK_INFO_ITEM_ID_SEGMENT}:{}:{PLAYBACK_INFO_MEDIA_SOURCE_ID_SEGMENT}:{}",
item_id.to_ascii_lowercase(),
media_source_id.to_ascii_lowercase()
);
if matches!(self.method, HttpMethod::Post) {
let body_hash = self
.body
.as_deref()
.map(|body| Self::body_hash(self.content_type.as_deref(), body))
.unwrap_or_default();
let content_type_hash = self
.content_type
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(|value| StringUtil::hash_hex(&value.to_ascii_lowercase()))
.unwrap_or_default();
key.push_str(&format!(
":{PLAYBACK_INFO_CONTENT_TYPE_HASH_SEGMENT}:{content_type_hash}:{PLAYBACK_INFO_BODY_HASH_SEGMENT}:{body_hash}"
));
}
Ok(key)
}
pub fn from_http_parts(
path: &str,
query: Option<&str>,
method: &Method,
body: Option<&[u8]>,
content_type: Option<&str>,
) -> Result<Self, PlaybackInfoServiceError> {
let item_id = Self::item_id_from_path(path)
.ok_or(PlaybackInfoServiceError::InvalidItemId)?;
let media_source_id = Self::media_source_id_from_query(query)
.or_else(|| Self::media_source_id_from_body(body, content_type))
.ok_or(PlaybackInfoServiceError::InvalidMediaSourceId)?;
let method = match *method {
Method::GET => HttpMethod::Get,
Method::POST => HttpMethod::Post,
_ => return Err(PlaybackInfoServiceError::UnsupportedMethod),
};
Ok(Self::new(
item_id,
media_source_id,
method,
body.map(|bytes| bytes.to_vec()),
content_type.map(str::to_string),
))
}
fn item_id_from_path(path: &str) -> Option<String> {
let segments: Vec<&str> = path
.split('/')
.filter(|segment| !segment.is_empty())
.collect();
segments
.windows(3)
.find(|window| {
window.first().is_some_and(|segment| {
segment.eq_ignore_ascii_case(PLAYBACK_INFO_ITEMS_SEGMENT)
}) && window.get(2).is_some_and(|segment| {
segment.eq_ignore_ascii_case(PLAYBACK_INFO_PATH_SEGMENT)
})
})
.and_then(|window| window.get(1))
.map(|segment| (*segment).to_string())
}
fn media_source_id_from_query(query: Option<&str>) -> Option<String> {
query.and_then(|query_str| {
form_urlencoded::parse(query_str.as_bytes())
.find(|(key, _)| Self::is_media_source_id_key(key))
.map(|(_, value)| value.into_owned())
})
}
fn media_source_id_from_body(
body: Option<&[u8]>,
content_type: Option<&str>,
) -> Option<String> {
let body = body?;
let trimmed = std::str::from_utf8(body).ok()?.trim();
if trimmed.is_empty() {
return None;
}
match Self::normalized_content_type(content_type) {
Some(CONTENT_TYPE_JSON) => Self::media_source_id_from_json(trimmed),
Some(CONTENT_TYPE_FORM_URLENCODED) => {
Self::media_source_id_from_form(trimmed)
}
Some("text/plain") | None => {
Self::media_source_id_from_json(trimmed)
.or_else(|| Self::media_source_id_from_form(trimmed))
}
_ => None,
}
}
fn media_source_id_from_json(body: &str) -> Option<String> {
let value = serde_json::from_str::<JsonValue>(body).ok()?;
match value {
JsonValue::Object(map) => {
map.into_iter().find_map(|(key, value)| {
if !Self::is_media_source_id_key(&key) {
return None;
}
value
.as_str()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
})
}
_ => None,
}
}
fn media_source_id_from_form(body: &str) -> Option<String> {
form_urlencoded::parse(body.as_bytes())
.find(|(key, _)| Self::is_media_source_id_key(key))
.map(|(_, value)| value.into_owned())
}
fn is_media_source_id_key(key: &str) -> bool {
key.eq_ignore_ascii_case(PLAYBACK_INFO_MEDIA_SOURCE_ID_QUERY_KEY)
|| key.eq_ignore_ascii_case("mediaSourceId")
|| key.eq_ignore_ascii_case("media_source_id")
}
fn body_hash(content_type: Option<&str>, body: &[u8]) -> String {
match Self::normalized_content_type(content_type) {
Some(CONTENT_TYPE_JSON) => Self::json_body_hash(body),
Some(CONTENT_TYPE_FORM_URLENCODED) => Self::form_body_hash(body),
_ => StringUtil::hash_bytes(body),
}
}
fn normalized_content_type(content_type: Option<&str>) -> Option<&str> {
content_type
.map(str::trim)
.filter(|value| !value.is_empty())
.and_then(|value| value.split(';').next())
.map(str::trim)
}
fn json_body_hash(body: &[u8]) -> String {
let canonical_json = serde_json::from_slice::<JsonValue>(body)
.map(Self::canonicalize_json_value)
.and_then(|value| serde_json::to_vec(&value))
.unwrap_or_else(|_| body.to_vec());
StringUtil::hash_bytes(&canonical_json)
}
fn canonicalize_json_value(value: JsonValue) -> JsonValue {
match value {
JsonValue::Array(values) => JsonValue::Array(
values
.into_iter()
.map(Self::canonicalize_json_value)
.collect(),
),
JsonValue::Object(map) => {
let mut entries: Vec<(String, JsonValue)> = map
.into_iter()
.map(|(key, value)| {
(key, Self::canonicalize_json_value(value))
})
.collect();
entries.sort_by(|(left_key, _), (right_key, _)| {
left_key.cmp(right_key)
});
let canonical_map: JsonMap<String, JsonValue> =
entries.into_iter().collect();
JsonValue::Object(canonical_map)
}
other => other,
}
}
fn form_body_hash(body: &[u8]) -> String {
let Ok(form_body) = std::str::from_utf8(body) else {
return StringUtil::hash_bytes(body);
};
let trimmed = form_body.trim();
if trimmed.is_empty() {
return String::new();
}
let mut form_pairs: Vec<(String, String)> =
form_urlencoded::parse(trimmed.as_bytes())
.map(|(key, value)| (key.into_owned(), value.into_owned()))
.collect();
form_pairs.sort_by(
|(left_key, left_value), (right_key, right_value)| {
left_key
.to_ascii_lowercase()
.cmp(&right_key.to_ascii_lowercase())
.then_with(|| left_value.cmp(right_value))
},
);
let normalized_form = form_urlencoded::Serializer::new(String::new())
.extend_pairs(
form_pairs
.iter()
.map(|(key, value)| (key.as_str(), value.as_str())),
)
.finish();
StringUtil::hash_hex(&normalized_form)
}
}
#[derive(Debug)]
pub enum PlaybackInfoServiceError {
InvalidItemId,
InvalidMediaSourceId,
UnsupportedMethod,
EmptyApiToken,
Upstream(anyhow::Error),
}
impl std::fmt::Display for PlaybackInfoServiceError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::InvalidItemId => write!(f, "invalid playback info item id"),
Self::InvalidMediaSourceId => {
write!(f, "invalid playback info media source id")
}
Self::UnsupportedMethod => {
write!(f, "unsupported playback info method")
}
Self::EmptyApiToken => write!(f, "empty playback info api token"),
Self::Upstream(error) => {
write!(f, "playback info upstream: {error}")
}
}
}
}
impl std::error::Error for PlaybackInfoServiceError {}
#[derive(Clone)]
pub struct PlaybackInfoService {
state: Arc<AppState>,
}
impl PlaybackInfoService {
pub fn new(state: Arc<AppState>) -> Self {
Self { state }
}
pub async fn get(
&self,
request: &PlaybackInfoRequest,
api_token: Option<&str>,
) -> Result<PlaybackInfo, PlaybackInfoServiceError> {
let cache_key = request.cache_key()?;
let cache = self.state.get_playback_info_cache().await;
if let Some(cached) = cache.get::<PlaybackInfo>(&cache_key) {
info_log!(
PLAYBACK_INFO_LOGGER_DOMAIN,
"playback_info_cache_hit key={}",
cache_key
);
return Ok(cached);
}
let lock = AppState::request_lock(
&self.state.playback_info_request_locks,
&cache_key,
);
let result = {
let wait_start = Instant::now();
let _guard = lock.lock().await;
let wait_ms = wait_start.elapsed().as_millis();
if let Some(cached) = cache.get::<PlaybackInfo>(&cache_key) {
info_log!(
PLAYBACK_INFO_LOGGER_DOMAIN,
"playback_info_inflight_wait_hit key={} lock_wait_ms={}",
cache_key,
wait_ms
);
Ok(cached)
} else {
let token = api_token
.map(str::trim)
.filter(|token| !token.is_empty())
.ok_or(PlaybackInfoServiceError::EmptyApiToken)?;
let fetch_start = Instant::now();
let playback_info =
self.fetch_from_emby(request, token).await?;
let fetch_ms = fetch_start.elapsed().as_millis();
if fetch_ms >= SLOW_PLAYBACK_INFO_FETCH_THRESHOLD_MS {
warn_log!(
PLAYBACK_INFO_LOGGER_DOMAIN,
"playback_info_fetch_slow item_id={} media_source_id={} \
elapsed_ms={}",
request.item_id,
request.media_source_id,
fetch_ms
);
} else {
debug_log!(
PLAYBACK_INFO_LOGGER_DOMAIN,
"playback_info_fetch_complete item_id={} media_source_id={} \
elapsed_ms={}",
request.item_id,
request.media_source_id,
fetch_ms
);
}
cache.insert(cache_key.clone(), playback_info.clone());
info_log!(
PLAYBACK_INFO_LOGGER_DOMAIN,
"playback_info_cache_store key={} media_sources={}",
cache_key,
playback_info.media_sources.len()
);
Ok(playback_info)
}
};
AppState::cleanup_request_lock(
&self.state.playback_info_request_locks,
&cache_key,
&lock,
);
result
}
pub fn api_token_from_headers_and_query(
headers: &hyper::HeaderMap,
query: Option<&str>,
) -> Option<String> {
query
.and_then(Self::api_token_from_query)
.or_else(|| Self::api_token_from_headers(headers))
}
pub fn api_token_from_headers_query_and_body(
headers: &hyper::HeaderMap,
query: Option<&str>,
body: Option<&[u8]>,
content_type: Option<&str>,
) -> Option<String> {
Self::api_token_from_headers_and_query(headers, query)
.or_else(|| Self::api_token_from_body(body, content_type))
}
async fn fetch_from_emby(
&self,
request: &PlaybackInfoRequest,
api_token: &str,
) -> Result<PlaybackInfo, PlaybackInfoServiceError> {
let config = self.state.get_config().await;
let emby_server_url = config.emby.get_uri().to_string();
let emby_client = self.state.get_emby_client().await.clone();
emby_client
.playback_info(emby_server_url, api_token.to_string(), request)
.await
.map_err(PlaybackInfoServiceError::Upstream)
}
fn api_token_from_query(query: &str) -> Option<String> {
form_urlencoded::parse(query.as_bytes())
.find(|(key, _)| Self::is_api_token_key(key))
.map(|(_, value)| value.into_owned())
}
fn api_token_from_headers(headers: &hyper::HeaderMap) -> Option<String> {
headers
.get("X-Emby-Token")
.and_then(|value| value.to_str().ok())
.map(str::to_string)
.or_else(|| {
headers
.get("x-emby-authorization")
.and_then(|value| value.to_str().ok())
.and_then(InfuseAuthorization::from_header_str)
.and_then(|auth| auth.get("MediaBrowser Token"))
})
}
fn api_token_from_body(
body: Option<&[u8]>,
content_type: Option<&str>,
) -> Option<String> {
let body = body?;
let trimmed = std::str::from_utf8(body).ok()?.trim();
if trimmed.is_empty() {
return None;
}
match Self::normalized_body_content_type(content_type) {
Some(CONTENT_TYPE_JSON) => Self::api_token_from_json(trimmed),
Some(CONTENT_TYPE_FORM_URLENCODED) => {
Self::api_token_from_form(trimmed)
}
Some("text/plain") | None => Self::api_token_from_json(trimmed)
.or_else(|| Self::api_token_from_form(trimmed)),
_ => None,
}
}
fn api_token_from_json(body: &str) -> Option<String> {
let value = serde_json::from_str::<JsonValue>(body).ok()?;
match value {
JsonValue::Object(map) => {
map.into_iter().find_map(|(key, value)| {
if !Self::is_api_token_key(&key) {
return None;
}
value
.as_str()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
})
}
_ => None,
}
}
fn api_token_from_form(body: &str) -> Option<String> {
form_urlencoded::parse(body.as_bytes())
.find(|(key, _)| Self::is_api_token_key(key))
.map(|(_, value)| value.into_owned())
}
fn is_api_token_key(key: &str) -> bool {
key.eq_ignore_ascii_case("api_key")
|| key.eq_ignore_ascii_case("X-Emby-Token")
|| key.eq_ignore_ascii_case("Token")
|| key.eq_ignore_ascii_case("MediaBrowser Token")
}
fn normalized_body_content_type(
content_type: Option<&str>,
) -> Option<&str> {
content_type
.map(str::trim)
.filter(|value| !value.is_empty())
.and_then(|value| value.split(';').next())
.map(str::trim)
}
}
#[cfg(test)]
mod tests {
use hyper::Method;
use super::{
CONTENT_TYPE_FORM_URLENCODED, CONTENT_TYPE_JSON, PlaybackInfoRequest,
PlaybackInfoService,
};
use crate::network::HttpMethod;
#[test]
fn playback_info_request_parses_get_path() {
let request = PlaybackInfoRequest::from_http_parts(
"/emby/Items/249971/PlaybackInfo",
Some("MediaSourceId=abc123&UserId=u1"),
&Method::GET,
None,
None,
);
assert!(request.is_ok());
if let Ok(request) = request {
assert_eq!(request.item_id, "249971");
assert_eq!(request.media_source_id, "abc123");
}
}
#[test]
fn playback_info_request_accepts_path_without_emby_prefix() {
let request = PlaybackInfoRequest::from_http_parts(
"/Items/249971/PlaybackInfo",
Some("MediaSourceId=abc123"),
&Method::GET,
None,
None,
);
assert!(request.is_ok());
if let Ok(request) = request {
assert_eq!(request.item_id, "249971");
assert_eq!(request.media_source_id, "abc123");
}
}
#[test]
fn playback_info_request_parses_media_source_id_from_json_body() {
let request = PlaybackInfoRequest::from_http_parts(
"/emby/Items/249971/PlaybackInfo",
Some("reqformat=json"),
&Method::POST,
Some(br#"{"MediaSourceId":"abc123"}"#),
Some(CONTENT_TYPE_JSON),
);
assert!(request.is_ok());
if let Ok(request) = request {
assert_eq!(request.item_id, "249971");
assert_eq!(request.media_source_id, "abc123");
}
}
#[test]
fn playback_info_request_parses_media_source_id_from_form_body() {
let request = PlaybackInfoRequest::from_http_parts(
"/emby/Items/249971/PlaybackInfo",
None,
&Method::POST,
Some(b"MediaSourceId=abc123"),
Some(CONTENT_TYPE_FORM_URLENCODED),
);
assert!(request.is_ok());
if let Ok(request) = request {
assert_eq!(request.media_source_id, "abc123");
}
}
#[test]
fn playback_info_cache_key_includes_method() {
let request = PlaybackInfoRequest::new(
"249971",
"ABC123",
HttpMethod::Get,
None,
None,
);
let cache_key = request.cache_key();
assert!(cache_key.is_ok());
assert_eq!(
cache_key.unwrap_or_default(),
"playback:info:method:get:item_id:249971:media_source_id:abc123"
);
}
#[test]
fn playback_info_cache_key_for_post_includes_body_hash() {
let request = PlaybackInfoRequest::new(
"249971",
"ABC123",
HttpMethod::Post,
Some(br#"{"AutoOpenLiveStream":true}"#.to_vec()),
Some("application/json".to_string()),
);
let cache_key = request.cache_key();
assert!(cache_key.is_ok());
let cache_key = cache_key.unwrap_or_default();
assert!(cache_key.starts_with(
"playback:info:method:post:item_id:249971:media_source_id:abc123"
));
assert!(cache_key.contains(":content_type_hash:"));
assert!(cache_key.contains(":body_hash:"));
}
#[test]
fn playback_info_json_body_hash_ignores_object_key_order() {
let request1 = PlaybackInfoRequest::new(
"249971",
"ABC123",
HttpMethod::Post,
Some(br#"{"B":2,"A":1}"#.to_vec()),
Some(CONTENT_TYPE_JSON.to_string()),
);
let request2 = PlaybackInfoRequest::new(
"249971",
"ABC123",
HttpMethod::Post,
Some(br#"{"A":1,"B":2}"#.to_vec()),
Some(CONTENT_TYPE_JSON.to_string()),
);
assert_eq!(request1.cache_key().ok(), request2.cache_key().ok());
}
#[test]
fn playback_info_json_body_hash_preserves_array_order() {
let request1 = PlaybackInfoRequest::new(
"249971",
"ABC123",
HttpMethod::Post,
Some(br#"{"A":[1,2]}"#.to_vec()),
Some(CONTENT_TYPE_JSON.to_string()),
);
let request2 = PlaybackInfoRequest::new(
"249971",
"ABC123",
HttpMethod::Post,
Some(br#"{"A":[2,1]}"#.to_vec()),
Some(CONTENT_TYPE_JSON.to_string()),
);
assert_ne!(request1.cache_key().ok(), request2.cache_key().ok());
}
#[test]
fn playback_info_form_body_hash_ignores_pair_order() {
let request1 = PlaybackInfoRequest::new(
"249971",
"ABC123",
HttpMethod::Post,
Some(b"X=1&Y=2".to_vec()),
Some(CONTENT_TYPE_FORM_URLENCODED.to_string()),
);
let request2 = PlaybackInfoRequest::new(
"249971",
"ABC123",
HttpMethod::Post,
Some(b"Y=2&X=1".to_vec()),
Some(CONTENT_TYPE_FORM_URLENCODED.to_string()),
);
assert_eq!(request1.cache_key().ok(), request2.cache_key().ok());
}
#[test]
fn playback_info_service_parses_api_token_from_unquoted_emby_header() {
let mut headers = hyper::HeaderMap::new();
headers.insert(
"x-emby-authorization",
hyper::header::HeaderValue::from_static(
"Emby UserId=user1,Client=Yamby,Device=Phone,DeviceId=device1,Version=1.0,Token=abc123",
),
);
assert_eq!(
PlaybackInfoService::api_token_from_headers_and_query(
&headers, None
),
Some("abc123".to_string())
);
}
#[test]
fn playback_info_service_parses_api_token_from_json_body() {
let headers = hyper::HeaderMap::new();
assert_eq!(
PlaybackInfoService::api_token_from_headers_query_and_body(
&headers,
None,
Some(br#"{"Token":"abc123"}"#),
Some(CONTENT_TYPE_JSON),
),
Some("abc123".to_string())
);
}
}