use futures::stream::{self, BoxStream};
use serde_json::Value;
use std::sync::Arc;
use super::innertube::{BrowseRequest, InnerTube};
use crate::types::{Entry, Thumbnail};
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum PageKind {
Playlist,
Channel,
Search(String),
}
const MAX_PAGES: u32 = 10_000;
const MAX_CONSECUTIVE_EMPTY_PAGES: u32 = 3;
struct PageState {
it: Arc<InnerTube>,
kind: PageKind,
next: Option<String>,
buffer: std::collections::VecDeque<Entry>,
first_page: Option<Value>,
pages_fetched: u32,
seen_tokens: std::collections::HashSet<String>,
consecutive_empty: u32,
}
pub(crate) fn entry_stream(
it: Arc<InnerTube>,
first_page: Value,
kind: PageKind,
) -> BoxStream<'static, crate::Result<Entry>> {
let state = PageState {
it,
kind,
next: None,
buffer: std::collections::VecDeque::new(),
first_page: Some(first_page),
pages_fetched: 0,
seen_tokens: std::collections::HashSet::new(),
consecutive_empty: 0,
};
let stream = stream::unfold(state, |mut state| async move {
loop {
if let Some(entry) = state.buffer.pop_front() {
return Some((Ok(entry), state));
}
if let Some(page) = state.first_page.take() {
let (entries, token) = parse_entries(&page, &state.kind);
state.buffer.extend(entries);
state.next = token;
continue;
}
let token = state.next.take()?;
if state.pages_fetched >= MAX_PAGES || !state.seen_tokens.insert(token.clone()) {
return None;
}
state.pages_fetched += 1;
let fetched = match &state.kind {
PageKind::Playlist | PageKind::Channel => {
state
.it
.browse(BrowseRequest {
continuation: Some(token),
..BrowseRequest::default()
})
.await
}
PageKind::Search(query) => state.it.search(query, Some(&token)).await,
};
match fetched {
Ok(page) => {
let (entries, next) = parse_entries(&page, &state.kind);
if entries.is_empty() {
state.consecutive_empty += 1;
if state.consecutive_empty >= MAX_CONSECUTIVE_EMPTY_PAGES {
return None;
}
} else {
state.consecutive_empty = 0;
}
state.buffer.extend(entries);
state.next = next;
}
Err(e) => return Some((Err(e), state)),
}
}
});
Box::pin(stream)
}
fn parse_entries(page: &Value, _kind: &PageKind) -> (Vec<Entry>, Option<String>) {
let mut entries = Vec::new();
let mut token = None;
collect(page, &mut entries, &mut token);
(entries, token)
}
fn collect(node: &Value, entries: &mut Vec<Entry>, token: &mut Option<String>) {
match node {
Value::Object(map) => {
for (key, child) in map {
match key.as_str() {
"playlistVideoRenderer" | "videoRenderer" => {
if let Some(entry) = parse_renderer(child) {
entries.push(entry);
}
}
"richItemRenderer" => {
if let Some(inner) =
child.get("content").and_then(|c| c.get("videoRenderer"))
{
if let Some(entry) = parse_renderer(inner) {
entries.push(entry);
}
} else {
collect(child, entries, token);
}
}
"continuationCommand" => {
if token.is_none() {
if let Some(t) = child.get("token").and_then(Value::as_str) {
*token = Some(t.to_string());
}
}
}
_ => collect(child, entries, token),
}
}
}
Value::Array(items) => {
for item in items {
collect(item, entries, token);
}
}
_ => {}
}
}
fn parse_renderer(r: &Value) -> Option<Entry> {
let id = r.get("videoId").and_then(Value::as_str)?.to_string();
let title = extract_text(r.get("title"));
let duration = r
.get("lengthSeconds")
.and_then(Value::as_str)
.and_then(|s| s.parse::<u64>().ok())
.map(std::time::Duration::from_secs);
let thumbnails = r
.get("thumbnail")
.and_then(|t| t.get("thumbnails"))
.and_then(Value::as_array)
.map(|arr| arr.iter().filter_map(parse_thumbnail).collect())
.unwrap_or_default();
Some(Entry {
url: format!("https://www.youtube.com/watch?v={id}"),
id,
title,
duration,
thumbnails,
})
}
fn parse_thumbnail(t: &Value) -> Option<Thumbnail> {
let url = t.get("url").and_then(Value::as_str)?.to_string();
Some(Thumbnail {
url,
width: t.get("width").and_then(Value::as_u64).map(|w| w as u32),
height: t.get("height").and_then(Value::as_u64).map(|h| h as u32),
})
}
fn extract_text(node: Option<&Value>) -> Option<String> {
let node = node?;
if let Some(simple) = node.get("simpleText").and_then(Value::as_str) {
return Some(simple.to_string());
}
let runs = node.get("runs").and_then(Value::as_array)?;
let mut out = String::new();
for run in runs {
if let Some(t) = run.get("text").and_then(Value::as_str) {
out.push_str(t);
}
}
if out.is_empty() {
None
} else {
Some(out)
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, Request, ResponseTemplate};
fn fixture(name: &str) -> Value {
let raw = std::fs::read_to_string(format!(
"{}/tests/fixtures/innertube/{name}",
env!("CARGO_MANIFEST_DIR")
))
.expect("fixture readable");
serde_json::from_str(&raw).expect("fixture is valid JSON")
}
#[tokio::test]
async fn playlist_stream_paginates_until_no_continuation() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/youtubei/v1/browse"))
.respond_with(|req: &Request| {
let body: Value = serde_json::from_slice(&req.body).unwrap_or(Value::Null);
assert_eq!(
body.get("continuation").and_then(Value::as_str),
Some("CONT_TOKEN_PAGE2"),
"continuation token must be forwarded in the browse body"
);
ResponseTemplate::new(200)
.set_body_json(super::tests::fixture("browse_playlist_page2.json"))
})
.expect(1)
.mount(&server)
.await;
let it = Arc::new(InnerTube::with_base_url(
reqwest::Client::new(),
server.uri(),
));
let first = fixture("browse_playlist_page1.json");
let entries: Vec<Entry> = entry_stream(it, first, PageKind::Playlist)
.map(|r| r.expect("entry"))
.collect()
.await;
let ids: Vec<&str> = entries.iter().map(|e| e.id.as_str()).collect();
assert_eq!(ids, vec!["aaaaaaaaaaa", "bbbbbbbbbbb", "ccccccccccc"]);
assert_eq!(
entries[0].url,
"https://www.youtube.com/watch?v=aaaaaaaaaaa"
);
assert_eq!(entries[0].title.as_deref(), Some("First Video"));
assert_eq!(
entries[0].duration,
Some(std::time::Duration::from_secs(100))
);
assert_eq!(
entries[2].duration,
Some(std::time::Duration::from_secs(300))
);
}
#[tokio::test]
async fn pagination_breaks_continuation_token_cycle() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/youtubei/v1/browse"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"onResponseReceivedActions": [{
"appendContinuationItemsAction": {
"continuationItems": [
{ "playlistVideoRenderer": { "videoId": "loopvideo00", "title": { "runs": [{ "text": "Loop" }] } } },
{ "continuationItemRenderer": { "continuationEndpoint": {
"continuationCommand": { "token": "SAME_TOKEN" } } } }
]
}
}]
})))
.mount(&server)
.await;
let it = Arc::new(InnerTube::with_base_url(
reqwest::Client::new(),
server.uri(),
));
let first = serde_json::json!({
"contents": { "x": [
{ "playlistVideoRenderer": { "videoId": "firstentry0", "title": { "runs": [{ "text": "First" }] } } },
{ "continuationItemRenderer": { "continuationEndpoint": {
"continuationCommand": { "token": "SAME_TOKEN" } } } }
]}
});
let entries: Vec<Entry> = entry_stream(it, first, PageKind::Playlist)
.map(|r| r.expect("entry"))
.collect()
.await;
let ids: Vec<&str> = entries.iter().map(|e| e.id.as_str()).collect();
assert_eq!(ids, vec!["firstentry0", "loopvideo00"]);
}
#[tokio::test]
async fn pagination_stops_on_no_progress_fresh_tokens() {
let server = MockServer::start().await;
let hits = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
let hits_resp = hits.clone();
Mock::given(method("POST"))
.and(path("/youtubei/v1/browse"))
.respond_with(move |_req: &Request| {
let n = hits_resp.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
ResponseTemplate::new(200).set_body_json(serde_json::json!({
"onResponseReceivedActions": [{ "appendContinuationItemsAction": {
"continuationItems": [
{ "continuationItemRenderer": { "continuationEndpoint": {
"continuationCommand": { "token": format!("FRESH_{n}") } } } }
]
}}]
}))
})
.mount(&server)
.await;
let it = Arc::new(InnerTube::with_base_url(
reqwest::Client::new(),
server.uri(),
));
let first = serde_json::json!({
"contents": { "x": [
{ "playlistVideoRenderer": { "videoId": "firstentry0", "title": { "runs": [{ "text": "First" }] } } },
{ "continuationItemRenderer": { "continuationEndpoint": {
"continuationCommand": { "token": "FRESH_START" } } } }
]}
});
let entries: Vec<Entry> = entry_stream(it, first, PageKind::Playlist)
.map(|r| r.expect("entry"))
.collect()
.await;
let ids: Vec<&str> = entries.iter().map(|e| e.id.as_str()).collect();
assert_eq!(ids, vec!["firstentry0"]);
assert!(
hits.load(std::sync::atomic::Ordering::SeqCst) <= MAX_CONSECUTIVE_EMPTY_PAGES,
"no-progress guard must stop after a few empty pages"
);
}
#[tokio::test]
async fn search_stream_paginates_via_search_endpoint() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/youtubei/v1/search"))
.and(|req: &Request| {
let body: Value = serde_json::from_slice(&req.body).unwrap_or(Value::Null);
body.get("continuation").is_none() && body["query"] == "rust"
})
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"contents": { "twoColumnSearchResultsRenderer": { "primaryContents": {
"sectionListRenderer": { "contents": [
{ "itemSectionRenderer": { "contents": [
{ "videoRenderer": { "videoId": "searchres01", "title": { "runs": [{ "text": "S1" }] } } }
]}},
{ "continuationItemRenderer": { "continuationEndpoint": {
"continuationCommand": { "token": "SEARCH_TOK_2" } } } }
]}
}}}
})))
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path("/youtubei/v1/search"))
.and(|req: &Request| {
let body: Value = serde_json::from_slice(&req.body).unwrap_or(Value::Null);
body.get("continuation").and_then(Value::as_str) == Some("SEARCH_TOK_2")
})
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"onResponseReceivedCommands": [{ "appendContinuationItemsAction": {
"continuationItems": [
{ "itemSectionRenderer": { "contents": [
{ "videoRenderer": { "videoId": "searchres02", "title": { "runs": [{ "text": "S2" }] } } }
]}}
]
}}]
})))
.mount(&server)
.await;
let it = Arc::new(InnerTube::with_base_url(
reqwest::Client::new(),
server.uri(),
));
let first = it.search("rust", None).await.unwrap();
let entries: Vec<Entry> = entry_stream(it, first, PageKind::Search("rust".to_string()))
.map(|r| r.expect("entry"))
.collect()
.await;
let ids: Vec<&str> = entries.iter().map(|e| e.id.as_str()).collect();
assert_eq!(ids, vec!["searchres01", "searchres02"]);
}
#[tokio::test]
async fn channel_stream_unwraps_rich_item_renderer() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/youtubei/v1/browse"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"onResponseReceivedActions": [{ "appendContinuationItemsAction": {
"continuationItems": [
{ "richItemRenderer": { "content": {
"videoRenderer": { "videoId": "chanvideo02", "title": { "runs": [{ "text": "C2" }] } }
}}}
]
}}]
})))
.mount(&server)
.await;
let it = Arc::new(InnerTube::with_base_url(
reqwest::Client::new(),
server.uri(),
));
let first = serde_json::json!({
"contents": { "x": [
{ "richItemRenderer": { "content": {
"videoRenderer": { "videoId": "chanvideo01", "title": { "runs": [{ "text": "C1" }] } }
}}},
{ "continuationItemRenderer": { "continuationEndpoint": {
"continuationCommand": { "token": "CHAN_TOK_2" } } } }
]}
});
let entries: Vec<Entry> = entry_stream(it, first, PageKind::Channel)
.map(|r| r.expect("entry"))
.collect()
.await;
let ids: Vec<&str> = entries.iter().map(|e| e.id.as_str()).collect();
assert_eq!(ids, vec!["chanvideo01", "chanvideo02"]);
}
}