use crate::{
api::ApiProvider,
config::Config,
error::FavCoreError,
res::{Res, ResSet, ResSets},
FavCoreResult,
};
use core::future::Future;
use futures::StreamExt;
use protobuf::MessageFull;
use protobuf_json_mapping::{parse_from_str_with_options, ParseOptions};
use reqwest::{header, Client, Response};
use serde::de::DeserializeOwned;
use serde_json::Value;
const PARSE_OPTIONS: ParseOptions = ParseOptions {
ignore_unknown_fields: true,
_future_options: (),
};
#[allow(missing_docs)]
#[trait_variant::make(Operations: Send)]
pub trait LocalOperations<SS, S, R, K>: ApiProvider<K> + Config
where
SS: ResSets<S, R>,
S: ResSet<R>,
R: Res,
K: Send,
{
async fn login(&mut self) -> FavCoreResult<()>;
async fn logout(&mut self) -> FavCoreResult<()>;
async fn fetch_sets(&self, sets: &mut SS) -> FavCoreResult<()>;
async fn fetch_set(&self, set: &mut S) -> FavCoreResult<()>;
async fn fetch_res(&self, resource: &mut R) -> FavCoreResult<()>;
async fn pull(&self, resource: &mut R) -> FavCoreResult<()>;
fn client(&self) -> &'static Client {
use std::sync::OnceLock;
let headers = self.headers();
static CLIENT: OnceLock<Client> = OnceLock::new();
CLIENT.get_or_init(|| Client::builder().default_headers(headers).build().unwrap())
}
fn request(
&self,
api_kind: K,
params: &[&str], ) -> impl Future<Output = FavCoreResult<Response>> {
async {
let client = self.client();
let api = self.api(api_kind);
let cookie = self.cookie_value(api.cookie_keys());
let resp = client
.request(api.method(), api.url(params))
.header(header::COOKIE, cookie)
.send()
.await?;
Ok(resp)
}
}
fn request_json<T>(
&self,
api_kind: K,
params: &[&str],
pointer: &str,
) -> impl Future<Output = FavCoreResult<T>>
where
T: DeserializeOwned,
{
async {
let resp = self.request(api_kind, params).await?;
resp2json(resp, pointer).await
}
}
fn request_proto<T>(
&self,
api_kind: K,
params: &[&str],
pointer: &str,
) -> impl Future<Output = FavCoreResult<T>>
where
T: MessageFull,
{
async {
let json = self.request_json(api_kind, params, pointer).await?;
json2proto(&json)
}
}
}
pub trait OperationsExt<SS, S, R, K>: Operations<SS, S, R, K>
where
SS: ResSets<S, R>,
S: ResSet<R>,
R: Res,
K: Send,
{
fn fetch_all<'a>(&self, set: &'a mut S) -> impl Future<Output = FavCoreResult<()>>
where
R: 'a,
{
batch_process(set, |r| self.fetch_res(r))
}
fn pull_all<'a>(&self, set: &'a mut S) -> impl Future<Output = FavCoreResult<()>>
where
R: 'a,
{
batch_process(set, |r| self.pull(r))
}
}
async fn batch_process<'a, R, F, T, S>(set: &'a mut S, f: F) -> FavCoreResult<()>
where
R: Res + 'a,
F: FnMut(&'a mut R) -> T,
T: Future<Output = FavCoreResult<()>>,
S: ResSet<R>,
{
let mut stream = tokio_stream::iter(set.iter_mut())
.map(f)
.buffer_unordered(10);
while let Some(r) = stream.next().await {
if let Err(e) = r {
match e {
FavCoreError::Cancel => {
print_warn(e);
break;
}
_ => print_err(e),
}
}
}
Ok(())
}
impl<T, SS, S, R, K> OperationsExt<SS, S, R, K> for T
where
T: Operations<SS, S, R, K>,
SS: ResSets<S, R>,
S: ResSet<R>,
R: Res,
K: Send,
{
}
pub fn print_warn<T>(e: T)
where
T: std::fmt::Display,
{
#[cfg(not(feature = "tracing"))]
println!("{}", e);
#[cfg(feature = "tracing")]
tracing::warn!("{}", e);
}
pub fn print_err<E>(e: E)
where
E: std::error::Error,
{
#[cfg(not(feature = "tracing"))]
println!("{}", e);
#[cfg(feature = "tracing")]
tracing::error!("{}", e);
}
pub async fn resp2json<T>(resp: Response, pointer: &str) -> FavCoreResult<T>
where
T: DeserializeOwned,
{
match resp.json::<Value>().await?.pointer_mut(pointer) {
Some(json) => {
let ret = serde_json::from_value(json.clone())?;
Ok(ret)
}
None => Err(FavCoreError::SerdePointerNotFound),
}
}
pub fn json2proto<T>(json: &Value) -> FavCoreResult<T>
where
T: MessageFull,
{
let json = json.to_string();
Ok(parse_from_str_with_options(&json, &PARSE_OPTIONS)?)
}
pub async fn resp2proto<T>(resp: Response, pointer: &str) -> FavCoreResult<T>
where
T: MessageFull,
{
let json = resp2json::<Value>(resp, pointer).await?;
json2proto(&json)
}