1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
use std::time::Duration;
use hls_m3u8::MediaPlaylist;
use patricia_tree::PatriciaSet;
use reqwest::{Client, Request, Url};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tracing::{debug, trace, warn};
use crate::{
hls::{clone_request, HlsQueue, HLS_MAX_RETRIES},
Error,
};
pub struct HlsWatch {
tx: UnboundedSender<HlsQueue>,
request: Request,
http: Client,
links: PatriciaSet,
master_url: Url,
timeout: Duration,
fail_counter: usize,
filter: Option<fn(&str) -> bool>,
}
impl HlsWatch {
/// Filter will filter any url that returns `false`, if `None` it will not filter anything.
/// For example if you want filter preloading segments use: `|e| !(e.contains("preloading"))`.
pub fn new(
request: Request,
http: Client,
filter: Option<fn(&str) -> bool>,
) -> (Self, UnboundedReceiver<HlsQueue>) {
let (tx, rx) = unbounded_channel();
let master_url = request
.url()
.join(".")
.expect("Could not join url with '.'.");
(
HlsWatch {
tx,
request,
http,
links: PatriciaSet::new(),
master_url,
timeout: Duration::from_secs(10),
fail_counter: 0,
filter,
},
rx,
)
}
pub async fn run(mut self) -> Result<(), Error> {
loop {
if self.fail_counter > HLS_MAX_RETRIES {
// There have either been errors or no new segments
// for `HLS_MAX_RETRIES` times the segment duration given
// in the m3u8 playlist file.
if self.tx.send(HlsQueue::StreamOver).is_err() {
return Err(Error::TIO(std::io::Error::last_os_error()));
};
break;
}
// Clone the request so we can reuse it in the loop.
let req = clone_request(&self.request, self.timeout);
let res = match self.http.execute(req).await {
Ok(r) => r,
Err(e) => {
warn!("[HLS] Playlist download failed!\n{}", e);
self.fail_counter += 1;
continue;
}
};
let m3u8_string = match res.text().await {
Ok(t) => t,
Err(e) => {
warn!("[HLS] Playlist text failed!\n{}", e);
self.fail_counter += 1;
continue;
}
};
let mut m3u8_parser = MediaPlaylist::builder();
// Allow excess segment duration because a lot of video sites have
// not very high quality m3u8 playlists, where the video segments,
// may be longer than what the file specifies as max.
m3u8_parser.allowable_excess_duration(Duration::from_secs(10));
let m3u8 = match m3u8_parser.parse(&m3u8_string) {
Ok(p) => p,
Err(e) => {
warn!("[HLS] Parsing failed!\n{}", e);
trace!("[HLS]\n{}", &m3u8_string);
self.fail_counter += 1;
continue;
}
};
// Get the target duration of a segment
let target_duration = m3u8.target_duration;
// Makes a iterator with the url parts from the playlist
for e in m3u8.segments.iter().map(|(_, e)| e.uri().trim()) {
trace!("[HLS] Tries to inserts: {}", e);
// Check if we have the segment in our set already
if self.links.insert(e) {
// Reset the counter as we got a new segment.
self.fail_counter = 0;
// Construct a url from the master and the segment.
let url_formatted = if let Ok(u) = Url::parse(e) {
u
} else {
// Attempt to parse the url as a relative url.
Url::parse(&format!("{}{}", self.master_url.as_str(), &e)).expect(
"The m3u8 does not currently work with stream_lib, \
please report the issue on the github repo, with an \
example of the file if possible.",
)
};
// Check that the filter runs.
if self.filter.map_or(true, |f| f(e)) {
debug!("[HLS] Adds {}!", url_formatted);
// Add the segment to the queue.
if self.tx.send(HlsQueue::Url(url_formatted)).is_err() {
return Err(Error::TIO(std::io::Error::last_os_error()));
};
}
}
}
if m3u8.has_end_list {
tracing::debug!("List has end, no more segments expected.");
break;
}
trace!("[HLS] Sleeps for {:#?}", target_duration);
// Sleeps for the target duration.
tokio::time::sleep(target_duration).await;
self.fail_counter += 1;
}
Ok(())
}
}