pub fn stream_comments<R, I>(
subreddit: &Subreddit,
sleep_time: Duration,
retry_strategy: R,
timeout: Option<Duration>,
) -> (impl Stream<Item = Result<CommentData, StreamError<RouxError>>>, JoinHandle<Result<(), SendError>>)Expand description
Stream new comments in a subreddit.
Creates a separate tokio task that regularly polls the subreddit for new comments. Previously unseen comments are sent into the returned stream.
Returns a tuple (stream, join_handle) where stream is the
Stream from which the comments can be read, and
join_handle is the JoinHandle for the
polling task.
sleep_time controls the interval between calls to the Reddit API, and
depends on how much traffic the subreddit has. Each call fetches the 100
latest items (the maximum number allowed by Reddit). A warning is logged
if none of those items has been seen in the previous call: this indicates
a potential miss of new content and suggests that a smaller sleep_time
should be chosen. Enable debug logging for more statistics.
If timeout is not None then calls to the Reddit API that take longer
than timeout are aborted with a StreamError::TimeoutError.
If an error occurs while fetching the latest comments from Reddit then
fetching is retried according to retry_strategy (see tokio_retry for
details). If one of the retries succeeds then normal operation is resumed.
If retry_strategy is finite and the last retry fails then its error is
sent into the stream, afterwards normal operation is resumed.
The spawned task runs indefinitely unless an error is encountered when
sending data into the stream (for example because the receiver is dropped).
In that case the task stops and the error is returned via join_handle.
See also stream_submissions.
ยงExample
The following example prints new comments to r/AskReddit in an endless loop.
use futures::StreamExt;
use roux::Subreddit;
use roux_stream::stream_comments;
use std::time::Duration;
use tokio_retry::strategy::ExponentialBackoff;
#[tokio::main]
async fn main() {
let subreddit = Subreddit::new("AskReddit");
// How often to retry when pulling the data from Reddit fails and
// how long to wait between retries. See the docs of `tokio_retry`
// for details.
let retry_strategy = ExponentialBackoff::from_millis(5).factor(100).take(3);
let (mut stream, join_handle) = stream_comments(
&subreddit,
Duration::from_secs(10),
retry_strategy,
Some(Duration::from_secs(10)),
);
while let Some(comment) = stream.next().await {
// `comment` is an `Err` if getting the latest comments
// from Reddit failed even after retrying.
let comment = comment.unwrap();
println!(
"{}{} (by u/{})",
comment.link_url.unwrap(),
comment.id.unwrap(),
comment.author.unwrap()
);
}
// In case there was an error sending the submissions through the
// stream, `join_handle` will report it.
join_handle.await.unwrap().unwrap();
}