use std::sync::Arc;
use memchr::memchr;
use parking_lot::Mutex;
pub struct FastMessageParser {
delimiter: u8,
}
impl FastMessageParser {
#[must_use]
pub fn new(delimiter: char) -> Self {
Self {
delimiter: delimiter as u8,
}
}
#[must_use]
pub fn parse_messages<'a>(&self, buffer: &'a [u8]) -> Vec<&'a [u8]> {
let mut messages = Vec::new();
let mut start = 0;
while let Some(pos) = memchr(self.delimiter, &buffer[start..]) {
let abs_pos = start + pos;
if abs_pos > start {
messages.push(&buffer[start..abs_pos]);
}
start = abs_pos + 1;
}
if start < buffer.len() {
messages.push(&buffer[start..]);
}
messages
}
pub fn parse_json_messages(
&self,
buffer: &[u8],
) -> Result<Vec<serde_json::Value>, serde_json::Error> {
let message_slices = self.parse_messages(buffer);
let mut results = Vec::new();
for slice in message_slices {
if let Ok(s) = std::str::from_utf8(slice)
&& let Ok(value) = serde_json::from_str(s.trim())
{
results.push(value);
}
}
Ok(results)
}
}
pub struct ChannelAggregator<T> {
inputs: Vec<crate::channels::core::RxFuture<T>>,
output: crate::channels::core::TxFuture<T>,
}
impl<T: Send + 'static + Clone> ChannelAggregator<T> {
#[must_use]
pub fn new(
inputs: Vec<crate::channels::core::RxFuture<T>>,
output: crate::channels::core::TxFuture<T>,
) -> Self {
Self { inputs, output }
}
pub fn start(self) {
for receiver in self.inputs {
let output = self.output.clone();
smol::spawn(async move {
let rx = receiver;
while let Ok(msg) = rx.recv().await {
let _ = output.send(msg).await;
}
})
.detach();
}
}
}
pub struct BatchingChannel<T> {
tx: crate::channels::core::TxFuture<Vec<T>>,
rx: crate::channels::core::RxFuture<Vec<T>>,
batch_size: usize,
current_batch: Arc<Mutex<Vec<T>>>,
}
impl<T: Clone + Send + 'static> BatchingChannel<T> {
#[must_use]
pub fn new(batch_size: usize, capacity: usize) -> Self {
let (tx, rx) = crate::channels::core::bounded_queue_3(capacity);
Self {
tx,
rx,
batch_size,
current_batch: Arc::new(Mutex::new(Vec::with_capacity(batch_size))),
}
}
pub async fn send(&self, item: T) -> Result<(), Box<dyn std::error::Error>> {
let should_flush = {
let mut batch = self.current_batch.lock();
batch.push(item);
batch.len() >= self.batch_size
};
if should_flush {
self.flush_batch().await?;
}
Ok(())
}
pub async fn flush_batch(&self) -> Result<(), Box<dyn std::error::Error>> {
let batch = {
let mut current = self.current_batch.lock();
std::mem::take(&mut *current)
};
if !batch.is_empty() {
self.tx.send(batch).await?;
}
Ok(())
}
#[must_use]
pub fn batch_receiver(&self) -> crate::channels::core::RxFuture<Vec<T>> {
self.rx.clone()
}
}
pub struct FilteredChannel<T, F> {
tx: crate::channels::core::TxFuture<T>,
filter: F,
}
impl<T: Send + 'static, F: Fn(&T) -> bool + Send + Sync + 'static> FilteredChannel<T, F> {
pub fn new(sender: crate::channels::core::TxFuture<T>, filter: F) -> Self {
Self { tx: sender, filter }
}
pub async fn send_filtered(&self, msg: T) -> Result<(), smol::channel::SendError<T>> {
if (self.filter)(&msg) {
self.tx.send(msg).await
} else {
Ok(()) }
}
}