pub mod grpc;
#[cfg(feature = "kafka-reporter")]
pub mod kafka;
pub mod print;
#[cfg(feature = "management")]
use crate::proto::v3::{InstancePingPkg, InstanceProperties};
use crate::proto::v3::{LogData, MeterData, SegmentObject};
use serde::{Deserialize, Serialize};
use std::{error::Error, ops::Deref, sync::Arc};
use tokio::sync::{OnceCell, mpsc};
use tonic::async_trait;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub enum CollectItem {
Trace(Box<SegmentObject>),
Log(Box<LogData>),
Meter(Box<MeterData>),
#[cfg(feature = "management")]
Instance(Box<InstanceProperties>),
#[cfg(feature = "management")]
Ping(Box<InstancePingPkg>),
}
impl CollectItem {
#[cfg(feature = "kafka-reporter")]
pub(crate) fn encode_to_vec(self) -> Vec<u8> {
use prost::Message;
match self {
CollectItem::Trace(item) => item.encode_to_vec(),
CollectItem::Log(item) => item.encode_to_vec(),
CollectItem::Meter(item) => item.encode_to_vec(),
#[cfg(feature = "management")]
CollectItem::Instance(item) => item.encode_to_vec(),
#[cfg(feature = "management")]
CollectItem::Ping(item) => item.encode_to_vec(),
}
}
}
pub(crate) type DynReport = dyn Report + Send + Sync + 'static;
pub trait Report {
fn report(&self, item: CollectItem);
}
impl Report for () {
fn report(&self, _item: CollectItem) {}
}
impl<T: Report> Report for Box<T> {
fn report(&self, item: CollectItem) {
Report::report(self.deref(), item)
}
}
impl<T: Report> Report for Arc<T> {
fn report(&self, item: CollectItem) {
Report::report(self.deref(), item)
}
}
impl<T: Report> Report for OnceCell<T> {
fn report(&self, item: CollectItem) {
Report::report(self.get().expect("OnceCell is empty"), item)
}
}
pub trait CollectItemProduce: Send + Sync + 'static {
fn produce(&self, item: CollectItem) -> Result<(), Box<dyn Error>>;
}
impl CollectItemProduce for () {
fn produce(&self, _item: CollectItem) -> Result<(), Box<dyn Error>> {
Ok(())
}
}
impl CollectItemProduce for mpsc::Sender<CollectItem> {
fn produce(&self, item: CollectItem) -> Result<(), Box<dyn Error>> {
Ok(self.blocking_send(item)?)
}
}
impl CollectItemProduce for mpsc::UnboundedSender<CollectItem> {
fn produce(&self, item: CollectItem) -> Result<(), Box<dyn Error>> {
Ok(self.send(item)?)
}
}
pub type ConsumeResult = Result<Option<CollectItem>, Box<dyn Error + Send>>;
#[async_trait]
pub trait CollectItemConsume: Send + Sync + 'static {
async fn consume(&mut self) -> ConsumeResult;
async fn try_consume(&mut self) -> ConsumeResult;
}
#[async_trait]
impl CollectItemConsume for () {
async fn consume(&mut self) -> ConsumeResult {
Ok(None)
}
async fn try_consume(&mut self) -> ConsumeResult {
Ok(None)
}
}
#[async_trait]
impl CollectItemConsume for mpsc::Receiver<CollectItem> {
async fn consume(&mut self) -> ConsumeResult {
Ok(self.recv().await)
}
async fn try_consume(&mut self) -> ConsumeResult {
use mpsc::error::TryRecvError;
match self.try_recv() {
Ok(item) => Ok(Some(item)),
Err(e) => match e {
TryRecvError::Empty => Ok(None),
TryRecvError::Disconnected => Err(Box::new(e)),
},
}
}
}
#[async_trait]
impl CollectItemConsume for mpsc::UnboundedReceiver<CollectItem> {
async fn consume(&mut self) -> ConsumeResult {
Ok(self.recv().await)
}
async fn try_consume(&mut self) -> ConsumeResult {
use mpsc::error::TryRecvError;
match self.try_recv() {
Ok(item) => Ok(Some(item)),
Err(e) => match e {
TryRecvError::Empty => Ok(None),
TryRecvError::Disconnected => Err(Box::new(e)),
},
}
}
}