use crate::config_bag::{Storable, StoreReplace};
use crate::str_bytes::StrBytes;
use bytes::Bytes;
use std::any::Any;
use std::fmt;
use std::sync::{mpsc, Mutex};
mod value {
use crate::str_bytes::StrBytes;
use crate::DateTime;
use bytes::Bytes;
#[non_exhaustive]
#[derive(Clone, Debug, PartialEq)]
pub enum HeaderValue {
Bool(bool),
Byte(i8),
Int16(i16),
Int32(i32),
Int64(i64),
ByteArray(Bytes),
String(StrBytes),
Timestamp(DateTime),
Uuid(u128),
}
impl HeaderValue {
pub fn as_bool(&self) -> Result<bool, &Self> {
match self {
HeaderValue::Bool(value) => Ok(*value),
_ => Err(self),
}
}
pub fn as_byte(&self) -> Result<i8, &Self> {
match self {
HeaderValue::Byte(value) => Ok(*value),
_ => Err(self),
}
}
pub fn as_int16(&self) -> Result<i16, &Self> {
match self {
HeaderValue::Int16(value) => Ok(*value),
_ => Err(self),
}
}
pub fn as_int32(&self) -> Result<i32, &Self> {
match self {
HeaderValue::Int32(value) => Ok(*value),
_ => Err(self),
}
}
pub fn as_int64(&self) -> Result<i64, &Self> {
match self {
HeaderValue::Int64(value) => Ok(*value),
_ => Err(self),
}
}
pub fn as_byte_array(&self) -> Result<&Bytes, &Self> {
match self {
HeaderValue::ByteArray(value) => Ok(value),
_ => Err(self),
}
}
pub fn as_string(&self) -> Result<&StrBytes, &Self> {
match self {
HeaderValue::String(value) => Ok(value),
_ => Err(self),
}
}
pub fn as_timestamp(&self) -> Result<DateTime, &Self> {
match self {
HeaderValue::Timestamp(value) => Ok(*value),
_ => Err(self),
}
}
pub fn as_uuid(&self) -> Result<u128, &Self> {
match self {
HeaderValue::Uuid(value) => Ok(*value),
_ => Err(self),
}
}
}
}
pub use value::HeaderValue;
#[non_exhaustive]
#[derive(Clone, Debug, PartialEq)]
pub struct Header {
name: StrBytes,
value: HeaderValue,
}
impl Header {
pub fn new(name: impl Into<StrBytes>, value: impl Into<HeaderValue>) -> Header {
Header {
name: name.into(),
value: value.into(),
}
}
pub fn name(&self) -> &StrBytes {
&self.name
}
pub fn value(&self) -> &HeaderValue {
&self.value
}
}
#[non_exhaustive]
#[derive(Clone, Debug, PartialEq)]
pub struct Message {
headers: Vec<Header>,
payload: Bytes,
}
impl Message {
pub fn new(payload: impl Into<Bytes>) -> Message {
Message {
headers: Vec::new(),
payload: payload.into(),
}
}
pub fn new_from_parts(headers: Vec<Header>, payload: impl Into<Bytes>) -> Self {
Self {
headers,
payload: payload.into(),
}
}
pub fn add_header(mut self, header: Header) -> Self {
self.headers.push(header);
self
}
pub fn headers(&self) -> &[Header] {
&self.headers
}
pub fn payload(&self) -> &Bytes {
&self.payload
}
}
#[derive(Debug)]
#[non_exhaustive]
pub enum RawMessage {
Decoded(Message),
Invalid(Option<Bytes>),
}
impl RawMessage {
pub fn invalid(bytes: Option<Bytes>) -> Self {
Self::Invalid(bytes)
}
}
#[derive(Debug)]
pub struct DeferredSignerSendError {
kind: DeferredSignerSendErrorKind,
}
#[derive(Debug)]
enum DeferredSignerSendErrorKind {
Closed,
}
impl fmt::Display for DeferredSignerSendError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.kind {
DeferredSignerSendErrorKind::Closed => f.write_str("receiver was dropped"),
}
}
}
impl std::error::Error for DeferredSignerSendError {}
#[derive(Debug)]
pub struct DeferredSignerRecvError {
kind: DeferredSignerRecvErrorKind,
}
#[derive(Debug)]
enum DeferredSignerRecvErrorKind {
NoSigner,
}
impl fmt::Display for DeferredSignerRecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.kind {
DeferredSignerRecvErrorKind::NoSigner => f.write_str("no signer was available"),
}
}
}
impl std::error::Error for DeferredSignerRecvError {}
#[derive(Debug)]
pub struct DeferredSignerReceiver {
rx: Mutex<Option<mpsc::Receiver<Box<dyn Any + Send + Sync>>>>,
}
impl DeferredSignerReceiver {
pub fn recv<T: Send + Sync + 'static>(&self) -> Result<T, DeferredSignerRecvError> {
let mut rx = self.rx.lock().unwrap();
rx.take()
.and_then(|r| r.try_recv().ok())
.and_then(|any| any.downcast::<T>().ok())
.map(|b| *b)
.ok_or(DeferredSignerRecvError {
kind: DeferredSignerRecvErrorKind::NoSigner,
})
}
}
#[derive(Debug)]
pub struct DeferredSignerSender {
tx: Mutex<mpsc::Sender<Box<dyn Any + Send + Sync>>>,
}
impl DeferredSignerSender {
pub fn new() -> (DeferredSignerReceiver, Self) {
let (tx, rx) = mpsc::channel();
(
DeferredSignerReceiver {
rx: Mutex::new(Some(rx)),
},
Self { tx: Mutex::new(tx) },
)
}
pub fn send<T: Send + Sync + 'static>(&self, value: T) -> Result<(), DeferredSignerSendError> {
self.tx
.lock()
.unwrap()
.send(Box::new(value))
.map_err(|_| DeferredSignerSendError {
kind: DeferredSignerSendErrorKind::Closed,
})
}
}
impl Storable for DeferredSignerSender {
type Storer = StoreReplace<Self>;
}
pub type SignMessageError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub trait SignMessage: fmt::Debug {
fn sign(&mut self, message: Message) -> Result<Message, SignMessageError>;
fn sign_empty(&mut self) -> Option<Result<Message, SignMessageError>>;
}