use std::fmt::Debug;
use crate::connector::features::shared::rate_limiter::RateLimitConfig;
use crate::connector::{Kind, StreamDescriptor, Venue};
use crate::utils::CorePickPolicy;
use serde::Deserialize;
const fn default_outbound_buffer() -> usize {
64
}
#[derive(Debug, Clone, Deserialize)]
pub struct GrpcDescriptor<T> {
pub max_hook_calls_at_once: usize,
pub wait_async_tasks_us: u64,
pub max_pending_actions: Option<usize>,
pub max_pending_events: Option<usize>,
pub core_pick_policy: Option<CorePickPolicy>,
#[serde(default)]
pub rate_limits: Vec<RateLimitConfig>,
pub max_decoding_message_size: Option<usize>,
pub max_encoding_message_size: Option<usize>,
#[serde(default = "default_outbound_buffer")]
pub outbound_buffer: usize,
pub ctx: Option<T>,
}
impl<T> GrpcDescriptor<T> {
pub fn new(
max_hook_calls_at_once: Option<usize>,
wait_async_tasks_us: Option<u64>,
max_pending_actions: Option<usize>,
max_pending_events: Option<usize>,
core_pick_policy: Option<CorePickPolicy>,
rate_limits: Option<Vec<RateLimitConfig>>,
max_decoding_message_size: Option<usize>,
max_encoding_message_size: Option<usize>,
outbound_buffer: Option<usize>,
ctx: Option<T>,
) -> Self {
let mut descriptor = Self::default();
descriptor.max_hook_calls_at_once = max_hook_calls_at_once.filter(|&x| x > 0).unwrap_or(10);
descriptor.wait_async_tasks_us = wait_async_tasks_us.unwrap_or(100);
descriptor.max_pending_actions = max_pending_actions;
descriptor.max_pending_events = max_pending_events;
descriptor.core_pick_policy = core_pick_policy;
descriptor.rate_limits = rate_limits.unwrap_or_default();
descriptor.max_decoding_message_size = max_decoding_message_size;
descriptor.max_encoding_message_size = max_encoding_message_size;
descriptor.outbound_buffer = outbound_buffer.unwrap_or(default_outbound_buffer());
descriptor.ctx = ctx;
descriptor
}
pub fn low_latency() -> Self {
let mut descriptor = Self::default();
descriptor.max_hook_calls_at_once = 4;
descriptor.wait_async_tasks_us = 0;
descriptor
}
pub fn high_throughput() -> Self {
let mut descriptor = Self::default();
descriptor.max_hook_calls_at_once = 64;
descriptor.wait_async_tasks_us = 200;
descriptor
}
pub fn add_rate_limit(&mut self, rl: RateLimitConfig) {
self.rate_limits.push(rl);
}
}
impl<T> Default for GrpcDescriptor<T> {
fn default() -> Self {
Self {
max_hook_calls_at_once: 10,
wait_async_tasks_us: 100,
max_pending_actions: None,
max_pending_events: None,
core_pick_policy: None,
rate_limits: Vec::new(),
max_decoding_message_size: None,
max_encoding_message_size: None,
outbound_buffer: default_outbound_buffer(),
ctx: None,
}
}
}
impl<T: Debug + Clone + Send + 'static> StreamDescriptor<T> for GrpcDescriptor<T> {
fn venue(&self) -> impl Venue {
"any"
}
fn kind(&self) -> impl Kind {
"tonic"
}
fn max_pending_actions(&self) -> Option<usize> {
self.max_pending_actions
}
fn max_pending_events(&self) -> Option<usize> {
self.max_pending_events
}
fn core_pick_policy(&self) -> Option<CorePickPolicy> {
self.core_pick_policy
}
fn health_at_start(&self) -> bool {
false
}
fn context(&self) -> Option<&T> {
self.ctx.as_ref()
}
}