pub mod config;
pub mod instapaper;
pub mod pinboard;
#[path = "url-stream.rs"]
pub mod url_stream;
pub mod vars;
use async_trait::async_trait;
use indicatif::{ProgressBar, ProgressIterator, ProgressStyle};
use snafu::{IntoError, ResultExt, Snafu};
use strfmt::strfmt;
use tracing::{debug, trace};
use std::{cmp::max, collections::HashMap, fmt::Debug, sync::atomic::Ordering, time::Duration};
type StdResult<T, E> = std::result::Result<T, E>;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("{}", source))]
Io { source: std::io::Error },
#[snafu(display("The maximum number of retries was exceeded"))]
MaxRetriesExceeded,
#[snafu(display("Pinboard API error {}", source))]
Pinboard { source: pinboard::Error },
#[snafu(display("While sending, got {}", source))]
Send { source: SendError },
}
pub type Result<T> = std::result::Result<T, Error>;
#[tracing::instrument]
pub async fn get_tags<W: std::io::Write + std::fmt::Debug>(
out: &mut W,
client: &pinboard::Client,
alpha: bool,
desc: bool,
csv: bool,
) -> Result<()> {
let mut tags = client
.get_all_tags()
.await
.context(PinboardSnafu)?
.drain()
.collect::<Vec<(String, usize)>>();
let max_lens = match csv {
true => None,
false => {
let (mut max_tag, mut max_count) = (0, 0);
for (tag, count) in &tags {
if tag.len() > max_tag {
max_tag = tag.len();
}
if *count > max_count {
max_count = *count;
}
}
Some((max_tag, max(9, (max_count as f64).log10() as usize + 1)))
}
};
if alpha {
tags.sort_by(|lhs, rhs| lhs.0.cmp(&rhs.0));
} else {
tags.sort_by(|lhs, rhs| lhs.1.cmp(&rhs.1));
}
if desc {
tags.reverse();
}
match max_lens {
Some((max_tag_len, max_use_count)) => {
let rule = format!(
"+{}+{}+",
String::from_utf8(vec![b'-'; max_tag_len + 2]).unwrap(),
String::from_utf8(vec![b'-'; max_use_count + 2]).unwrap()
);
let mut fmtvars: HashMap<String, usize> = HashMap::new();
fmtvars.insert(String::from("1"), max_tag_len);
fmtvars.insert(String::from("2"), max_use_count);
let fmt = strfmt("| {{tag:<{1}}} | {{uc:{2}}} |", &fmtvars).unwrap();
let mut hdrvars: HashMap<String, &str> = HashMap::new();
hdrvars.insert(String::from("tag"), "Tag");
hdrvars.insert(String::from("uc"), "Use Count");
writeln!(out, "{}", strfmt(&fmt, &hdrvars).unwrap()).context(IoSnafu)?;
writeln!(out, "{}", rule).context(IoSnafu)?;
for tag in &tags {
let s = format!("{}", tag.1);
let mut vars: HashMap<String, &str> = HashMap::new();
vars.insert(String::from("tag"), &tag.0);
vars.insert(String::from("uc"), &s);
writeln!(out, "{}", strfmt(&fmt, &vars).unwrap()).context(IoSnafu)?;
}
writeln!(out, "{}", rule).context(IoSnafu)?;
}
None => {
for tag in &tags {
writeln!(out, "{},{}", tag.0, tag.1).context(IoSnafu)?;
}
}
}
Ok(())
}
#[tracing::instrument]
pub async fn make_requests_with_backoff<I, T>(
len: usize,
reqs: I,
mut beta_ms: u64,
max_beta_ms: u64,
max_retries: usize,
) -> Result<u64>
where
I: Iterator<Item = T> + Debug,
T: Sender + Debug,
{
use std::time::SystemTime;
let mut reqs = reqs.progress_with(
ProgressBar::new(len.try_into().unwrap())
.with_style(ProgressStyle::default_bar().progress_chars(".| ")),
);
let mut retries = 0;
let mut req = reqs.next();
while let Some(post) = &req {
let last_sent = SystemTime::now();
match post.send().await {
Ok(_) => {
beta_ms /= 2;
trace!("beta :=> {}", beta_ms);
req = reqs.next();
}
Err(SendError::TooManyRequests) => {
beta_ms = beta_ms.checked_mul(2).unwrap_or(max_beta_ms);
trace!("beta :=> {}", beta_ms);
retries += 1;
if retries > max_retries {
return Err(Error::MaxRetriesExceeded);
}
}
Err(err) => {
return Err(SendSnafu.into_error(err));
}
}
if req.is_some() {
let elapsed: u128 = SystemTime::now()
.duration_since(last_sent)
.unwrap_or(Duration::ZERO)
.as_millis();
let elapsed: u64 = u64::try_from(elapsed).unwrap_or(u64::MAX);
trace!("elapsed is {}", elapsed);
let backoff = beta_ms.saturating_sub(elapsed);
debug!("Sleeping for {}ms...", backoff);
tokio::time::sleep(std::time::Duration::from_millis(backoff)).await;
debug!("Sleeping for {}ms...done.", backoff);
}
}
Ok(beta_ms)
}
#[derive(Debug, Snafu)]
pub enum SendError {
#[snafu(display("Rate-limited"))]
TooManyRequests,
#[snafu(display("Request failure: {source}"))]
Failure { source: Box<dyn std::error::Error> },
}
#[async_trait]
pub trait Sender {
async fn send(&self) -> StdResult<(), SendError>;
}
#[derive(Debug)]
pub struct InstapaperPost<'a> {
client: &'a instapaper::Client,
post: instapaper::Post,
}
#[async_trait]
impl Sender for &InstapaperPost<'_> {
async fn send(&self) -> StdResult<(), SendError> {
match self.client.send_link(&self.post).await {
Ok(_) => Ok(()),
Err(instapaper::Error::RateLimit) => Err(SendError::TooManyRequests),
Err(err) => Err(FailureSnafu.into_error(Box::new(err))),
}
}
}
impl<'a> InstapaperPost<'a> {
pub fn new(client: &'a instapaper::Client, post: instapaper::Post) -> InstapaperPost<'a> {
InstapaperPost { client, post }
}
}
#[derive(Debug)]
pub struct PinboardPost<'a, 'b> {
client: &'a pinboard::Client,
post: pinboard::Post,
insty: Option<(
InstapaperPost<'b>,
std::sync::Arc<std::sync::atomic::AtomicU64>,
u64,
usize,
)>,
}
#[async_trait]
impl Sender for PinboardPost<'_, '_> {
async fn send(&self) -> StdResult<(), SendError> {
match self.client.send_post(&self.post).await {
Ok(_) => {
if let Some((insty_post, beta, max_beta, max_retries)) = &self.insty {
let new_beta = make_requests_with_backoff(
1,
std::iter::once(insty_post),
beta.load(Ordering::Relaxed),
*max_beta,
*max_retries,
)
.await
.map_err(|err| FailureSnafu.into_error(Box::new(err)))?;
beta.store(new_beta, Ordering::Relaxed);
}
Ok(())
}
Err(pinboard::Error::RateLimit) => Err(SendError::TooManyRequests),
Err(err) => Err(FailureSnafu.into_error(Box::new(err))),
}
}
}
impl<'a, 'b> PinboardPost<'a, 'b> {
pub fn new(
pin_client: &'a pinboard::Client,
pin_post: pinboard::Post,
instapaper: Option<(
&'b instapaper::Client,
instapaper::Post,
std::sync::Arc<std::sync::atomic::AtomicU64>,
u64,
usize,
)>,
) -> PinboardPost<'a, 'b> {
PinboardPost {
client: pin_client,
post: pin_post,
insty: instapaper.map(|(client, post, atom, max_beta, max_retries)| {
(
InstapaperPost::new(client, post),
atom,
max_beta,
max_retries,
)
}),
}
}
}
#[cfg(test)]
mod test {
use super::*;
use reqwest::{StatusCode, Url};
use test_log::test;
use tracing::error;
use std::{
collections::VecDeque,
str::FromStr,
sync::{Arc, Mutex},
time::{Duration, SystemTime},
};
struct MockTestServer {
addr: Url,
expected: VecDeque<(String, StatusCode)>,
deltas: Vec<Duration>,
last_received: Option<SystemTime>,
}
impl MockTestServer {
pub async fn new<'a, I>(iter: I) -> Arc<Mutex<MockTestServer>>
where
I: IntoIterator<Item = &'a (&'static str, StatusCode)>,
{
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let mock = Arc::new(Mutex::new(MockTestServer {
addr: Url::parse(&format!("http://{}", listener.local_addr().unwrap())).unwrap(),
expected: iter
.into_iter()
.map(|pair| (pair.0.into(), pair.1))
.collect::<VecDeque<(String, StatusCode)>>(),
deltas: Vec::new(),
last_received: None,
}));
let server_mock = mock.clone();
tokio::spawn(async move {
loop {
let (mut stream, _) = listener.accept().await.unwrap();
let inner_mock = server_mock.clone();
tokio::spawn(async move {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let mut incoming = vec![];
loop {
let mut buf = vec![0u8; 1024];
let read = stream.read(&mut buf).await.unwrap();
incoming.extend_from_slice(&buf[..read]);
if incoming.len() > 4 && &incoming[incoming.len() - 4..] == b"\r\n\r\n"
{
break;
}
}
let now = SystemTime::now();
let incoming = std::str::from_utf8(&incoming).unwrap();
let expected;
{
let inner_mock = inner_mock.lock();
expected = inner_mock.unwrap().pop_front();
}
if incoming.starts_with(&expected.0) {
stream
.write_all(
format!(
"HTTP/1.1 {}\r\n\r\n{}\r\n",
expected.1,
if expected.0 == "/v1/posts/add" {
"{\"result_code\":\"done\"}"
} else {
"{\"bookmark_id\": 1530898236}"
}
)
.as_bytes(),
)
.await
.unwrap();
} else {
stream
.write_all(b"HTTP/1.1 428 Precondition Required\r\n")
.await
.unwrap();
}
{
let mut inner_mock = inner_mock.lock().unwrap();
inner_mock.note_receipt_of_request(now);
}
});
}
});
mock
}
pub fn deltas(&self) -> Vec<Duration> {
self.deltas.clone()
}
pub fn note_receipt_of_request(&mut self, recvd: SystemTime) {
if let Some(last_received) = self.last_received {
self.deltas
.push(recvd.duration_since(last_received).unwrap())
}
self.last_received = Some(recvd);
}
pub fn pop_front(&mut self) -> (String, StatusCode) {
self.expected.pop_front().unwrap()
}
pub fn server_url(&self) -> Url {
self.addr.clone()
}
}
#[test(tokio::test)]
async fn backoff_test() {
let mock = MockTestServer::new(&[
("GET /v1/posts/add", StatusCode::TOO_MANY_REQUESTS), ("GET /v1/posts/add", StatusCode::OK), ("GET /api/add", StatusCode::OK), ("GET /v1/posts/add", StatusCode::OK), ("GET /api/add", StatusCode::BAD_REQUEST), ("GET /api/add", StatusCode::BAD_REQUEST), ("GET /api/add", StatusCode::OK), ])
.await;
let insty_client;
let pin_client;
{
let guard = mock.lock().unwrap();
pin_client =
pinboard::Client::new(guard.server_url(), "sp1ff:FFFFFFFFFFFFFFFFFFFF").unwrap();
insty_client = instapaper::Client::new(guard.server_url(), "sp1ff@pobox.com", "c0fee")
.expect("Failed to build client");
}
let atom = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1000));
make_requests_with_backoff(
2,
vec![
PinboardPost::new(
&pin_client,
pinboard::Post::new(
Url::parse("https://foo.com").unwrap(),
pinboard::Title::from_str("Frobinator").unwrap(),
vec![].into_iter(),
true,
),
Some((
&insty_client,
instapaper::Post::new("https://foo.com", Some("Frobinator"), None).unwrap(),
atom.clone(),
10000,
5,
)),
),
PinboardPost::new(
&pin_client,
pinboard::Post::new(
Url::parse("https://bar.com").unwrap(),
pinboard::Title::from_str("Bar none!").unwrap(),
vec![].into_iter(),
true,
),
Some((
&insty_client,
instapaper::Post::new("https://bar.com", Some("Bar none!"), None).unwrap(),
atom.clone(),
10000,
5,
)),
),
]
.into_iter(),
3000,
10000,
5,
)
.await
.unwrap_or_else(|err| {
error!("{}", err);
panic!();
});
let deltas;
{
deltas = mock.lock().unwrap().deltas();
}
eprintln!("deltas[0]: {}", deltas[0].as_millis());
eprintln!("deltas[1]: {}", deltas[1].as_millis());
eprintln!("deltas[2]: {}", deltas[2].as_millis());
eprintln!("deltas[3]: {}", deltas[3].as_millis());
eprintln!("deltas[4]: {}", deltas[4].as_millis());
eprintln!("deltas[5]: {}", deltas[5].as_millis());
fn abs_diff_millis(lhs: Duration, rhs: Duration) -> u128 {
if lhs <= rhs {
rhs.checked_sub(lhs).unwrap_or(Duration::MAX).as_millis()
} else {
lhs.checked_sub(rhs).unwrap_or(Duration::MAX).as_millis()
}
}
assert_eq!(6, deltas.len());
assert!(abs_diff_millis(deltas[0], Duration::from_millis(6000)) <= 250);
assert!(deltas[1].as_millis() <= 250);
assert!(abs_diff_millis(deltas[2], Duration::from_millis(3000)) <= 250);
assert!(deltas[3].as_millis() <= 250);
assert!(abs_diff_millis(deltas[4], Duration::from_millis(1000)) <= 550);
assert!(abs_diff_millis(deltas[5], Duration::from_millis(2000)) <= 250);
}
}
#[cfg(test)]
mod proto {
use super::*;
#[test]
fn smoke() {
let x = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
for _ in x.iter().progress_with(
ProgressBar::new(x.len().try_into().unwrap())
.with_style(ProgressStyle::default_bar().progress_chars(".| ")),
) {
std::thread::sleep(std::time::Duration::from_millis(500));
}
}
}