use std::time::Duration;
use async_trait::async_trait;
use bytes::Bytes;
use futures::Stream;
use log::debug;
use pingora_error::Result;
use tokio_stream::StreamExt;
pub mod client;
pub mod server;
pub const CUSTOM_MESSAGE_QUEUE_SIZE: usize = 128;
pub fn is_informational_except_101<T: PartialOrd<u32>>(code: T) -> bool {
code > 99 && code < 200 && code != 101
}
#[async_trait]
pub trait CustomMessageWrite: Send + Sync + Unpin + 'static {
fn set_write_timeout(&mut self, timeout: Option<Duration>);
async fn write_custom_message(&mut self, msg: Bytes) -> Result<()>;
async fn finish_custom(&mut self) -> Result<()>;
}
#[doc(hidden)]
#[async_trait]
impl CustomMessageWrite for () {
fn set_write_timeout(&mut self, _timeout: Option<Duration>) {}
async fn write_custom_message(&mut self, msg: Bytes) -> Result<()> {
debug!("write_custom_message: {:?}", msg);
Ok(())
}
async fn finish_custom(&mut self) -> Result<()> {
debug!("finish_custom");
Ok(())
}
}
#[async_trait]
pub trait BodyWrite: Send + Sync + Unpin + 'static {
async fn write_all_buf(&mut self, data: &mut Bytes) -> Result<()>;
async fn finish(&mut self) -> Result<()>;
async fn cleanup(&mut self) -> Result<()>;
fn upgrade_body_writer(&mut self);
}
pub async fn drain_custom_messages(
reader: Option<Box<dyn Stream<Item = Result<Bytes>> + Unpin + Send + Sync + 'static>>,
) -> Result<()> {
let Some(mut reader) = reader else {
return Ok(());
};
while let Some(res) = reader.next().await {
let msg = res?;
debug!("consume_custom_messages: {msg:?}");
}
Ok(())
}
#[macro_export]
macro_rules! custom_session {
($base_obj:ident . $($method_tokens:tt)+) => {
if let Some(custom_session) = $base_obj.as_custom_mut() {
#[allow(clippy::semicolon_if_nothing_returned)]
custom_session.$($method_tokens)+;
}
};
}