use crate::{
api::ApiProvider,
config::HttpConfig,
error::FavCoreError,
res::{Res, Set, Sets},
FavCoreResult,
};
use core::future::Future;
use futures::StreamExt;
use protobuf::MessageFull;
use protobuf_json_mapping::{parse_from_str_with_options, ParseOptions};
use reqwest::{header, Client, Response};
use serde::de::DeserializeOwned;
use serde_json::Value;
use tracing::error;
const PARSE_OPTIONS: ParseOptions = ParseOptions {
ignore_unknown_fields: true,
_future_options: (),
};
pub trait Ops<ApiKind>: AuthOps + SetsOps + SetOps + ResOps {}
#[trait_variant::make(AuthOps: Send)]
pub trait LocalAuthOps: Net + HttpConfig {
async fn login(&mut self) -> FavCoreResult<()>;
async fn logout(&mut self) -> FavCoreResult<()>;
}
#[trait_variant::make(SetsOps: Send)]
pub trait LocalSetsOps: Net + HttpConfig {
type Sets: Sets;
async fn fetch_sets(&self, sets: &mut Self::Sets) -> FavCoreResult<()>;
}
#[trait_variant::make(SetOps: Send)]
pub trait LocalSetOps: Net + HttpConfig {
type Set: Set;
async fn fetch_set(&self, set: &mut Self::Set) -> FavCoreResult<()>;
}
#[trait_variant::make(ResOps: Send)]
pub trait LocalResOps: Net + HttpConfig {
type Res: Res;
async fn fetch_res(&self, resource: &mut Self::Res) -> FavCoreResult<()>;
async fn pull_res(&self, resource: &mut Self::Res) -> FavCoreResult<()>;
}
pub trait Net: HttpConfig + ApiProvider {
fn client(&self) -> &'static Client {
use std::sync::OnceLock;
let headers = self.headers();
static CLIENT: OnceLock<Client> = OnceLock::new();
CLIENT.get_or_init(|| Client::builder().default_headers(headers).build().unwrap())
}
fn request(
&self,
api_kind: Self::ApiKind,
params: Vec<String>, ) -> impl Future<Output = FavCoreResult<Response>> {
async {
let client = self.client();
let api = self.api(api_kind);
let cookie = self.cookie_value(api.cookie_keys());
let resp = client
.request(api.method(), api.url(params))
.header(header::COOKIE, cookie)
.send()
.await?;
Ok(resp)
}
}
fn request_json<T>(
&self,
api_kind: Self::ApiKind,
params: Vec<String>,
pointer: &str,
) -> impl Future<Output = FavCoreResult<T>>
where
T: DeserializeOwned,
{
async {
let resp = self.request(api_kind, params).await?;
resp2json(resp, pointer).await
}
}
fn request_proto<T>(
&self,
api_kind: Self::ApiKind,
params: Vec<String>,
pointer: &str,
) -> impl Future<Output = FavCoreResult<T>>
where
T: MessageFull,
{
async {
let json = self.request_json(api_kind, params, pointer).await?;
json2proto(&json)
}
}
}
impl<T> Net for T where T: HttpConfig + ApiProvider {}
pub trait SetOpsExt: SetOps {
fn batch_fetch_set<'a, SS>(&self, sets: &'a mut SS) -> impl Future<Output = FavCoreResult<()>>
where
SS: Sets<Set = Self::Set>,
{
batch_op_set(sets, |r| self.fetch_set(r))
}
}
pub async fn batch_op_set<'a, SS, F, T>(set: &'a mut SS, f: F) -> FavCoreResult<()>
where
SS: Sets + 'a,
F: FnMut(&'a mut SS::Set) -> T,
T: Future<Output = FavCoreResult<()>>,
{
let mut stream = tokio_stream::iter(set.iter_mut())
.map(f)
.buffer_unordered(8);
loop {
tokio::select! {
res = stream.next() => {
match res {
None => break,
Some(Err(FavCoreError::Cancel)) => return Err(FavCoreError::Cancel),
Some(Err(e)) => error!("{}", e),
_ => {}
}
}
_ = tokio::signal::ctrl_c() => {
return Err(FavCoreError::Cancel);
}
}
}
Ok(())
}
impl<T> SetOpsExt for T where T: SetOps {}
pub trait ResOpsExt: ResOps {
fn batch_fetch_res<'a, S>(&self, set: &'a mut S) -> impl Future<Output = FavCoreResult<()>>
where
S: Set<Res = Self::Res>,
{
batch_op_res(set, |r| self.fetch_res(r))
}
fn batch_pull_res<'a, S>(&self, set: &'a mut S) -> impl Future<Output = FavCoreResult<()>>
where
S: Set<Res = Self::Res>,
{
batch_op_res(set, |r| self.pull_res(r))
}
}
pub async fn batch_op_res<'a, S, F, T>(set: &'a mut S, f: F) -> FavCoreResult<()>
where
S: Set + 'a,
F: FnMut(&'a mut S::Res) -> T,
T: Future<Output = FavCoreResult<()>>,
{
let mut stream = tokio_stream::iter(set.iter_mut())
.map(f)
.buffer_unordered(8);
loop {
tokio::select! {
res = stream.next() => {
match res {
None => break,
Some(Err(FavCoreError::Cancel)) => return Err(FavCoreError::Cancel),
Some(Err(FavCoreError::NetworkError(e))) if e.is_connect() => return Err(FavCoreError::NetworkError(e)),
Some(Err(e)) => error!("{}", e),
_ => {}
}
}
_ = tokio::signal::ctrl_c() => {
return Err(FavCoreError::Cancel);
}
}
}
Ok(())
}
impl<T> ResOpsExt for T where T: ResOps {}
pub async fn resp2json<T>(resp: Response, pointer: &str) -> FavCoreResult<T>
where
T: DeserializeOwned,
{
match resp.json::<Value>().await?.pointer_mut(pointer) {
Some(json) => {
let ret = serde_json::from_value(json.take())?;
Ok(ret)
}
None => Err(FavCoreError::SerdePointerNotFound),
}
}
pub fn json2proto<T>(json: &Value) -> FavCoreResult<T>
where
T: MessageFull,
{
let json = json.to_string();
Ok(parse_from_str_with_options(&json, &PARSE_OPTIONS)?)
}
pub async fn resp2proto<T>(resp: Response, pointer: &str) -> FavCoreResult<T>
where
T: MessageFull,
{
let json = resp2json::<Value>(resp, pointer).await?;
json2proto(&json)
}