use bigml::wait::{wait, BackoffType, WaitOptions, WaitStatus};
use itertools::Itertools;
use reqwest::Client;
use serde::Deserialize;
use serde_json::Value;
use std::{collections::HashMap, str::FromStr};
use tokio::{
sync::mpsc::Sender,
time::{sleep, Duration},
};
use super::ShopifyLocator;
use crate::common::*;
use crate::credentials::CredentialsManager;
use crate::json_to_csv::write_rows;
use crate::tokio_glue::{box_stream_once, bytes_channel, SendResultExt};
#[instrument(
level = "trace",
name = "shopify::local_data",
skip(ctx, shared_args, source_args)
)]
pub(crate) async fn local_data_helper(
ctx: Context,
source: ShopifyLocator,
shared_args: SharedArguments<Unverified>,
source_args: SourceArguments<Unverified>,
) -> Result<Option<BoxStream<CsvStream>>> {
let shared_args = shared_args.verify(ShopifyLocator::features())?;
let _source_args = source_args.verify(ShopifyLocator::features())?;
let schema = shared_args.schema().to_owned();
let mut url = source.to_https_url()?;
url.query_pairs_mut().append_pair("limit", "250").finish();
let creds = CredentialsManager::singleton().get("shopify").await?;
let auth_token = creds.get_required("auth_token")?.to_owned();
let mut include_headers = true;
let (mut sender, receiver) = bytes_channel(1);
let worker: BoxFuture<()> = async move {
let client = Client::new();
let mut next_url = url.clone();
loop {
let wait_options = WaitOptions::default()
.backoff_type(BackoffType::Exponential)
.retry_interval(Duration::from_secs(5))
.allowed_errors(3);
let result = wait(&wait_options, || {
let next_url = next_url.clone();
async {
let result =
get_shopify_response(&client, next_url, auth_token.to_owned())
.await;
match result {
Ok(resp) => WaitStatus::Finished(resp),
Err(err) => WaitStatus::FailedTemporarily(err),
}
}
})
.await;
let resp = match result {
Ok(resp) => resp,
Err(err) => {
error!("ERROR: {:?}", err);
sender.send(Err(err)).await.map_send_err()?;
return Ok(());
}
};
if let Err(err) = convert_rows_to_csv_and_send(
&mut sender,
&schema,
resp.rows,
include_headers,
)
.await
{
sender.send(Err(err)).await.map_send_err()?;
return Ok(());
}
include_headers = false;
if let Some(next_page_url) = resp.next_page_url {
next_url = next_page_url;
if resp.call_limit.should_wait() {
sleep(Duration::from_millis(1000)).await;
}
} else {
return Ok::<_, Error>(());
}
}
}
.boxed();
ctx.spawn_worker(worker);
Ok(Some(box_stream_once(Ok(CsvStream {
name: "data".to_owned(),
data: receiver.boxed(),
}))))
}
#[derive(Debug)]
struct ShopifyResponse {
call_limit: CallLimit,
next_page_url: Option<Url>,
rows: Vec<Value>,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
struct CallLimit {
used: u32,
limit: u32,
}
impl CallLimit {
fn should_wait(self) -> bool {
self.used.saturating_mul(2) >= self.limit
}
}
impl FromStr for CallLimit {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if let Some(split_pos) = s.find('/') {
let used = s[..split_pos]
.parse::<u32>()
.with_context(|| format!("could not parse call limit {:?}", s))?;
let limit = s[split_pos + 1..]
.parse::<u32>()
.with_context(|| format!("could not parse call limit {:?}", s))?;
Ok(CallLimit { used, limit })
} else {
Err(format_err!("could not parse call limit {:?}", s))
}
}
}
#[test]
fn parse_call_limit() {
let cl = CallLimit::from_str("2/10").unwrap();
assert_eq!(cl, CallLimit { used: 2, limit: 10 });
}
#[derive(Deserialize)]
#[serde(transparent)]
struct RowsJson(HashMap<String, Vec<Value>>);
impl RowsJson {
fn into_rows(self) -> Result<Vec<Value>> {
if self.0.len() == 1 {
Ok(self
.0
.into_iter()
.next()
.expect("checked for exactly one value, didn't find it")
.1)
} else {
Err(format_err!(
"found multiple keys in Shopify response: {}",
self.0.keys().join(",")
))
}
}
}
#[instrument(level = "trace", skip(client, auth_token))]
async fn get_shopify_response(
client: &Client,
url: Url,
auth_token: String,
) -> Result<ShopifyResponse> {
debug!("Fetching Shopify data");
let resp: reqwest::Response = client
.get(url)
.header("X-Shopify-Access-Token", auth_token)
.send()
.await
.context("error accessing Shopify REST API")?;
if resp.status().is_success() {
let call_limit = resp
.headers()
.get("x-shopify-shop-api-call-limit")
.ok_or_else(|| {
format_err!("could not find x-shopify-shop-api-call-limit header")
})?
.to_str()
.context("could not convert x-shopify-shop-api-call-limit to string")?
.parse::<CallLimit>()?;
let next_page_url =
if let Some(link) = resp.headers().get("link") {
let link = link
.to_str()
.context("could not convert link header to string")?;
let links = parse_link_header::parse(link)
.map_err(|_| format_err!("error parsing Link header"))?;
if let Some(next) = links.get(&Some("next".to_owned())) {
Some(Url::from_str(&next.uri.to_string()).with_context(|| {
format_err!("could not parse URL {:?}", next)
})?)
} else {
None
}
} else {
None
};
let rows_json = resp
.json::<RowsJson>()
.await
.context("error fetching Shopify data")?;
Ok(ShopifyResponse {
call_limit,
next_page_url,
rows: rows_json.into_rows()?,
})
} else {
let status = resp.status();
let body = resp
.text()
.await
.context("error reading Shopify error response")?;
Err(format_err!(
"could not read data from Shopify: {} {}",
status,
body,
))
}
}
#[instrument(
level = "trace",
name = "convert_rows_to_csv_and_send",
skip(sender, schema, rows)
)]
async fn convert_rows_to_csv_and_send(
sender: &mut Sender<Result<BytesMut>>,
schema: &Schema,
rows: Vec<Value>,
include_headers: bool,
) -> Result<()> {
let mut buffer = Vec::with_capacity(8 * 1024);
write_rows(&mut buffer, schema, rows, include_headers)?;
let bytes = BytesMut::from(&buffer[..]);
sender.send(Ok(bytes)).await.map_send_err()?;
Ok(())
}