use std::collections::HashMap;
use std::sync::{Mutex, OnceLock};
use std::time::{Duration, Instant};
use serde_json::Value;
use tokio::sync::OnceCell;
use tracing::Instrument;
use super::botguard;
use super::clients::{ANDROID_VR_1_61_48, STREAM_FALLBACK_CLIENTS, WEB_REMIX, YouTubeClient};
use super::decipher;
use super::innertube::{self, PlayerExtras};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum AudioFormat {
Webm,
M4a,
}
impl AudioFormat {
pub fn extension(self) -> &'static str {
match self {
AudioFormat::Webm => "webm",
AudioFormat::M4a => "m4a",
}
}
fn from_mime(mime: &str) -> Option<AudioFormat> {
if mime.contains("webm") {
Some(AudioFormat::Webm)
} else if mime.contains("mp4") {
Some(AudioFormat::M4a)
} else {
None
}
}
}
#[derive(Clone, Debug)]
pub struct YtStreamInfo {
pub url: String,
pub format: AudioFormat,
pub user_agent: String,
pub content_length: Option<u64>,
pub duration_secs: Option<u64>,
pub bitrate: Option<u32>,
pub itag: Option<u32>,
pub range_safe: bool,
}
static VISITOR_DATA: OnceCell<String> = OnceCell::const_new();
async fn visitor_data(cookies: Option<&str>) -> Result<&'static str, String> {
VISITOR_DATA
.get_or_try_init(|| async { innertube::visitor_id(cookies).await })
.await
.map(|s| s.as_str())
}
#[tracing::instrument(name = "yt.resolve", skip(cookies), fields(video_id = %video_id, anon = cookies.is_none()))]
pub async fn resolve(video_id: &str, cookies: Option<&str>) -> Result<YtStreamInfo, String> {
let mut decipher_fallback: Option<YtStreamInfo> = None;
if let Some(c) = cookies {
let uid = super::derive_user_id(c);
if let Some(u) = &uid {
seed_tier_from_db(u).await;
}
let skip = uid.as_deref().is_some_and(known_non_premium) && botguard::is_available();
if !skip {
match try_native_decipher(video_id, cookies).await {
Ok(info) if is_premium_itag(info.itag) => {
if let Some(u) = &uid {
remember_tier(u, true);
}
return Ok(info);
}
Ok(info) => {
if let Some(u) = &uid {
remember_tier(u, false);
}
tracing::debug!(itag = ?info.itag, "signed-in but non-Premium — needs a content pot, trying ANDROID_VR");
decipher_fallback = Some(info);
}
Err(e) => tracing::debug!(error = %e, "premium decipher failed — falling back"),
}
}
}
let mut last_err = {
let (pot, visitor) = tokio::join!(botguard::mint_content_pot(video_id), visitor_data(None));
match (pot, visitor) {
(Ok(pot), Ok(visitor)) => {
let extras = PlayerExtras {
content_pot: Some(&pot),
visitor_data: Some(visitor),
signature_timestamp: None,
};
match innertube::player(ANDROID_VR_1_61_48, video_id, None, extras).await {
Ok(json) => {
let status = PlayabilityStatus::from_response(&json);
if status == PlayabilityStatus::Ok {
if let Some(info) = pick_plain_format(&json, ANDROID_VR_1_61_48) {
return Ok(info);
}
"ANDROID_VR+pot: no plain audio format".to_string()
} else {
format!(
"ANDROID_VR+pot playability {}: {}",
status.as_str(),
playability_reason(&json)
)
}
}
Err(e) => format!("ANDROID_VR+pot: {e}"),
}
}
(Err(e), _) => format!("PO mint: {e}"),
(_, Err(e)) => format!("visitor_data: {e}"),
}
};
tracing::debug!(%last_err, "ANDROID_VR+pot failed — trying bare clients");
for client in STREAM_FALLBACK_CLIENTS {
let cookies_for = if client.login_supported {
cookies
} else {
None
};
match innertube::player(*client, video_id, cookies_for, PlayerExtras::default()).await {
Ok(json) => {
let status = PlayabilityStatus::from_response(&json);
if !status.is_attemptable() {
last_err = format!(
"{} playability {}: {}",
client.client_name,
status.as_str(),
playability_reason(&json)
);
continue;
}
if let Some(info) = pick_plain_format(&json, *client) {
return Ok(info);
}
last_err = format!("{} returned no plain audio formats", client.client_name);
}
Err(e) => last_err = format!("{}: {e}", client.client_name),
}
}
if let Some(mut info) = decipher_fallback {
tracing::warn!(
"no content pot available (minter not running?) — using the non-Premium decipher \
stream sequentially (range requests 403 without a pot, so seeking is disabled)"
);
info.range_safe = false;
return Ok(info);
}
Err(format!("all stream paths failed; last error: {last_err}"))
}
fn is_premium_itag(itag: Option<u32>) -> bool {
matches!(itag, Some(774 | 141 | 256 | 258))
}
static ACCOUNT_PREMIUM: OnceLock<Mutex<HashMap<String, (Instant, bool)>>> = OnceLock::new();
static TIER_DB: OnceLock<db::Db> = OnceLock::new();
const FREE_TIER_TTL: Duration = Duration::from_secs(30 * 60);
const TIER_META_KIND: &str = "yt_tier_v2";
pub fn init_tier_store(handle: db::Db) {
let _ = TIER_DB.set(handle);
}
fn account_premium() -> &'static Mutex<HashMap<String, (Instant, bool)>> {
ACCOUNT_PREMIUM.get_or_init(|| Mutex::new(HashMap::new()))
}
fn known_non_premium(user_id: &str) -> bool {
matches!(
account_premium().lock().ok().and_then(|m| m.get(user_id).copied()),
Some((at, false)) if at.elapsed() < FREE_TIER_TTL
)
}
async fn seed_tier_from_db(user_id: &str) {
{
let Ok(m) = account_premium().lock() else {
return;
};
if m.contains_key(user_id) {
return;
}
}
let Some(handle) = TIER_DB.get() else { return };
let Ok(Some(payload)) = handle.meta_get(user_id, TIER_META_KIND).await else {
return;
};
let (verdict, ts) = match payload.split_once(':') {
Some((v, t)) => (v.to_string(), t.parse::<u64>().unwrap_or(0)),
None => (payload, 0),
};
let premium = verdict == "premium";
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let age = Duration::from_secs(now.saturating_sub(ts));
if !premium && age >= FREE_TIER_TTL {
return; }
let seeded_at = Instant::now().checked_sub(age).unwrap_or_else(Instant::now);
if let Ok(mut m) = account_premium().lock() {
m.entry(user_id.to_string()).or_insert((seeded_at, premium));
}
}
fn remember_tier(user_id: &str, premium: bool) {
if !premium {
let was_premium = account_premium()
.lock()
.ok()
.and_then(|m| m.get(user_id).map(|(_, p)| *p))
.unwrap_or(false);
if was_premium {
tracing::info!(
"yt: non-premium itag from a known-Premium account — keeping the premium verdict (track without premium encodes, or a transient auth hiccup)"
);
return;
}
}
if let Ok(mut m) = account_premium().lock() {
m.insert(user_id.to_string(), (Instant::now(), premium));
}
if let Some(handle) = TIER_DB.get() {
let handle = handle.clone();
let uid = user_id.to_string();
tokio::spawn(
async move {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let payload = format!("{}:{now}", if premium { "premium" } else { "free" });
let _ = handle.meta_put(&uid, TIER_META_KIND, &payload).await;
}
.instrument(tracing::info_span!("yt.tier_persist")),
);
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PlayabilityStatus {
Ok,
Unknown,
LoginRequired,
Unplayable,
Error,
AgeCheck,
Other,
}
impl PlayabilityStatus {
fn from_response(json: &Value) -> Self {
match json
.pointer("/playabilityStatus/status")
.and_then(|v| v.as_str())
{
Some("OK") => PlayabilityStatus::Ok,
Some("LOGIN_REQUIRED") => PlayabilityStatus::LoginRequired,
Some("UNPLAYABLE") => PlayabilityStatus::Unplayable,
Some("ERROR") => PlayabilityStatus::Error,
Some("AGE_CHECK_REQUIRED") | Some("CONTENT_CHECK_REQUIRED") => {
PlayabilityStatus::AgeCheck
}
Some(_) => PlayabilityStatus::Other,
None => PlayabilityStatus::Unknown,
}
}
fn as_str(self) -> &'static str {
match self {
PlayabilityStatus::Ok => "OK",
PlayabilityStatus::Unknown => "UNKNOWN",
PlayabilityStatus::LoginRequired => "LOGIN_REQUIRED",
PlayabilityStatus::Unplayable => "UNPLAYABLE",
PlayabilityStatus::Error => "ERROR",
PlayabilityStatus::AgeCheck => "AGE_CHECK_REQUIRED",
PlayabilityStatus::Other => "OTHER",
}
}
fn is_attemptable(self) -> bool {
matches!(self, PlayabilityStatus::Ok | PlayabilityStatus::Unknown)
}
}
fn playability_reason(json: &Value) -> &str {
json.pointer("/playabilityStatus/reason")
.and_then(|v| v.as_str())
.unwrap_or("")
}
fn pick_plain_format(json: &Value, client: YouTubeClient) -> Option<YtStreamInfo> {
let formats = json
.pointer("/streamingData/adaptiveFormats")
.and_then(|v| v.as_array())?;
let mut best_webm: Option<(&Value, u64)> = None;
let mut best_m4a: Option<(&Value, u64)> = None;
for f in formats {
let mime = f.get("mimeType").and_then(|v| v.as_str()).unwrap_or("");
if !mime.starts_with("audio/") {
continue;
}
if f.get("url").and_then(|v| v.as_str()).is_none() {
continue;
}
let bitrate = f.get("bitrate").and_then(|v| v.as_u64()).unwrap_or(0);
if mime.contains("webm") && best_webm.map(|(_, b)| bitrate > b).unwrap_or(true) {
best_webm = Some((f, bitrate));
}
if mime.contains("mp4") && best_m4a.map(|(_, b)| bitrate > b).unwrap_or(true) {
best_m4a = Some((f, bitrate));
}
}
let (fmt, bitrate) = best_webm.or(best_m4a)?;
let url = fmt.get("url")?.as_str()?.to_string();
let mime = fmt.get("mimeType")?.as_str()?;
let format = AudioFormat::from_mime(mime)?;
let itag = fmt.get("itag").and_then(|v| v.as_u64()).map(|v| v as u32);
let vid = json
.pointer("/videoDetails/videoId")
.and_then(|v| v.as_str())
.unwrap_or("?");
tracing::info!(video_id = %vid, itag = itag.unwrap_or(0), kbps = bitrate / 1000, mime, client = client.client_name, "stream resolved (plain)");
let content_length = fmt
.get("contentLength")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<u64>().ok());
let duration_secs = json
.pointer("/videoDetails/lengthSeconds")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<u64>().ok())
.or_else(|| {
fmt.get("approxDurationMs")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<u64>().ok())
.map(|ms| (ms + 500) / 1000)
});
Some(YtStreamInfo {
url,
format,
user_agent: client.user_agent.to_string(),
content_length,
duration_secs,
bitrate: Some(bitrate as u32),
itag,
range_safe: true,
})
}
fn pick_best_audio(json: &Value) -> Option<&Value> {
json.pointer("/streamingData/adaptiveFormats")
.and_then(|v| v.as_array())?
.iter()
.filter(|f| {
f.get("mimeType")
.and_then(|v| v.as_str())
.map(|m| m.starts_with("audio/"))
.unwrap_or(false)
})
.max_by_key(|f| f.get("bitrate").and_then(|v| v.as_u64()).unwrap_or(0))
}
fn stream_info_from(
json: &Value,
fmt: &Value,
url: String,
client: YouTubeClient,
) -> Option<YtStreamInfo> {
let mime = fmt.get("mimeType")?.as_str()?;
let format = AudioFormat::from_mime(mime)?;
let content_length = fmt
.get("contentLength")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<u64>().ok());
let duration_secs = json
.pointer("/videoDetails/lengthSeconds")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<u64>().ok())
.or_else(|| {
fmt.get("approxDurationMs")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<u64>().ok())
.map(|ms| (ms + 500) / 1000)
});
let bitrate = fmt
.get("bitrate")
.and_then(|v| v.as_u64())
.map(|v| v as u32);
let itag = fmt.get("itag").and_then(|v| v.as_u64()).map(|v| v as u32);
let vid = json
.pointer("/videoDetails/videoId")
.and_then(|v| v.as_str())
.unwrap_or("?");
tracing::info!(video_id = %vid, itag = itag.unwrap_or(0), kbps = bitrate.unwrap_or(0) / 1000, mime, client = client.client_name, "stream resolved (decipher)");
Some(YtStreamInfo {
url,
format,
user_agent: client.user_agent.to_string(),
content_length,
duration_secs,
bitrate,
itag,
range_safe: true,
})
}
async fn try_native_decipher(
video_id: &str,
cookies: Option<&str>,
) -> Result<YtStreamInfo, String> {
let player = decipher::player_js(video_id).await?;
let extras = PlayerExtras {
signature_timestamp: Some(player.1),
..Default::default()
};
let json = innertube::player(WEB_REMIX, video_id, cookies, extras).await?;
let status = PlayabilityStatus::from_response(&json);
if status != PlayabilityStatus::Ok {
return Err(format!(
"WEB_REMIX playability {}: {}",
status.as_str(),
playability_reason(&json)
));
}
let fmt = pick_best_audio(&json).ok_or("WEB_REMIX returned no audio format")?;
let url = decipher::deciphered_url(&player.0, fmt).await?;
stream_info_from(&json, fmt, url, WEB_REMIX)
.ok_or_else(|| "deciphered format missing fields".to_string())
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn pick_plain_format_carries_bitrate_and_itag() {
let json = json!({
"streamingData": { "adaptiveFormats": [
{ "itag": 251, "mimeType": "audio/webm; codecs=\"opus\"",
"bitrate": 136544, "contentLength": "3433755",
"url": "https://r.googlevideo.com/v?n=N" }
]},
"videoDetails": { "lengthSeconds": "212" }
});
let info = pick_plain_format(&json, WEB_REMIX).expect("should pick a plain format");
assert_eq!(info.itag, Some(251));
assert_eq!(info.bitrate, Some(136544));
assert_eq!(info.duration_secs, Some(212));
}
#[test]
fn stream_info_from_carries_bitrate_and_itag() {
let json = json!({ "videoDetails": { "lengthSeconds": "212" } });
let fmt = json!({ "itag": 774, "mimeType": "audio/webm; codecs=\"opus\"",
"bitrate": 270204, "contentLength": "6852699" });
let info = stream_info_from(&json, &fmt, "https://x/y".into(), WEB_REMIX)
.expect("should build stream info");
assert_eq!(info.itag, Some(774));
assert_eq!(info.bitrate, Some(270204));
assert_eq!(info.duration_secs, Some(212));
}
#[tokio::test]
#[ignore = "hits live YouTube + needs a system JS runtime"]
async fn resolve_populates_bitrate_itag_duration() {
let info = resolve("dQw4w9WgXcQ", None)
.await
.expect("resolve should succeed");
tracing::debug!(
"[test] resolved itag={:?} bitrate={:?} kbps duration={:?}s",
info.itag,
info.bitrate.map(|b| b / 1000),
info.duration_secs,
);
assert!(info.itag.is_some(), "itag must be set");
assert!(
info.bitrate.unwrap_or(0) > 0,
"bitrate must be > 0, got {:?}",
info.bitrate
);
assert!(info.duration_secs.unwrap_or(0) > 0, "duration must be set");
}
}