use crate::connector::errors::{StreamError, StreamResult};
use crate::connector::features::http::client::{ReqwestClient, ReqwestClientSpec};
use crate::connector::features::http::connector::HttpConnector;
use crate::connector::features::http::stream::actions::HttpAction;
use crate::connector::features::http::stream::descriptor::HttpDescriptor;
use crate::connector::features::http::stream::event::HttpEvent;
use crate::connector::features::shared::actions::StreamActionRaw;
use crate::connector::features::shared::clients_map::ClientsMap;
use crate::connector::features::shared::events::StreamEventRaw;
use crate::connector::features::shared::rate_limiter::RateLimitManager;
use crate::connector::{Hook, HookArgs, IntoHook, RuntimeCtx, StreamRunner, StreamSpawner};
use crate::io::ringbuffer::RingSender;
use crate::prelude::{BaseRx, TxPairExt};
use crate::utils::{Reducer, StateMarker};
use anyhow::anyhow;
use crossbeam::channel::unbounded;
use std::fmt::Debug;
use std::rc::Rc;
use std::time::Duration;
use tokio::runtime::Builder;
use tokio::sync::Mutex;
use tokio::task::LocalSet;
use tokio::time::sleep;
impl<E, R, S, T> StreamSpawner<HttpDescriptor<T>, E, R, S, T> for HttpConnector
where
E: TxPairExt,
S: StateMarker,
R: Reducer,
T: Debug + Clone + Send + 'static,
{
}
impl<E, R, S, T> StreamRunner<HttpDescriptor<T>, E, R, S, T> for HttpConnector
where
E: TxPairExt,
S: StateMarker,
R: Reducer,
T: Debug + Clone + Send + 'static,
{
type Config = ClientsMap<ReqwestClient, ReqwestClientSpec>;
type ActionTx = RingSender<StreamActionRaw<HttpAction>>;
type RawEvent = StreamEventRaw<HttpEvent>;
type HookResult = ();
fn build_config(&mut self, _desc: &HttpDescriptor<T>) -> anyhow::Result<Self::Config> {
Ok(self.clients_map())
}
fn run<H>(
mut ctx: RuntimeCtx<
ClientsMap<ReqwestClient, ReqwestClientSpec>,
HttpDescriptor<T>,
RingSender<StreamActionRaw<HttpAction>>,
E,
R,
S,
T,
>,
hook: H,
) -> StreamResult<()>
where
H: IntoHook<StreamEventRaw<HttpEvent>, E, R, S, HttpDescriptor<T>, (), T>,
E: TxPairExt,
S: StateMarker,
{
let mut hook = hook.into_hook();
let wait_async_tasks = Duration::from_micros(ctx.desc.wait_async_tasks_us);
let rl_manager = Rc::new(Mutex::new(RateLimitManager::new(
ctx.desc.rate_limits.clone(),
)));
let rt = Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
.map_err(|e| StreamError::Unknown(anyhow!("Tokio Runtime error: {e}")))?;
let local = LocalSet::new();
let (res_tx, res_rx) = unbounded();
ctx.health.up();
loop {
if ctx.cancel.is_cancelled() {
ctx.health.down();
break Err(StreamError::Cancelled);
}
while let Ok(mut action) = ctx.action_rx.try_recv() {
let Some(client) = action.conn_id().and_then(|id| ctx.cfg.get(&id)) else {
continue;
};
let inner = match action.inner_take() {
Some(inner) => inner,
None => continue,
};
let request = match inner
.to_request_builder(&client.as_ref().0, action.timeout())
.build()
{
Ok(request) => request,
Err(e) => {
let inner = HttpEvent::from_error(e);
let stream_event = StreamEventRaw::builder(None)
.conn_id(action.conn_id())
.req_id(action.req_id())
.label(action.label_take())
.payload(action.json_take())
.inner(inner)
.build()?;
let _ = res_tx.try_send(stream_event);
continue;
}
};
let res_tx = res_tx.clone();
let rl_manager = if action.rl_ctx().is_some() {
Some(rl_manager.clone())
} else {
None
};
local.spawn_local(async move {
if let Some(rl_manager) = rl_manager {
let plan = {
let mut mgr = rl_manager.lock().await;
mgr.plan(action.rl_ctx().as_ref().unwrap(), action.rl_weight())
};
if let Some(plan) = plan {
for (bucket, weight) in plan {
bucket.wait(weight).await;
}
}
}
let out = client.as_ref().0.execute(request).await;
let inner = match out {
Ok(resp) => HttpEvent::from_raw(resp).await,
Err(e) => HttpEvent::from_error(e),
};
let stream_event = StreamEventRaw::builder(None)
.conn_id(action.conn_id())
.req_id(action.req_id())
.label(action.label_take())
.payload(action.json_take())
.inner(inner)
.build()
.unwrap();
let _ = res_tx.try_send(stream_event);
});
}
let mut budget = ctx.desc.max_hook_calls_at_once;
while budget > 0 {
match res_rx.try_recv() {
Ok(event) => {
budget -= 1;
hook.call(HookArgs::new(
event,
&mut ctx.event_tx,
&mut ctx.reducer,
&ctx.state,
&ctx.desc,
&mut ctx.health,
))
}
Err(_) => break,
}
}
rt.block_on(local.run_until(async {
if wait_async_tasks.is_zero() {
tokio::task::yield_now().await;
} else {
sleep(wait_async_tasks).await;
}
}));
}
}
}