use super::S;
use crate::interface::Read;
use crate::{ActorError, Result, UniqueIdentifier, Who};
use async_trait::async_trait;
use flume::Receiver;
use std::any::type_name;
use std::fmt::Debug;
use std::{fmt::Display, sync::Arc};
use tokio::sync::Mutex;
pub(crate) struct Input<C, T, U, const N: usize>
where
U: UniqueIdentifier<DataType = T>,
C: Read<U>,
{
rx: Receiver<S<U>>,
client: Arc<Mutex<C>>,
hash: u64,
}
impl<C, T, U, const N: usize> Input<C, T, U, N>
where
U: UniqueIdentifier<DataType = T>,
C: Read<U>,
{
pub fn new(rx: Receiver<S<U>>, client: Arc<Mutex<C>>, hash: u64) -> Self {
Self { rx, client, hash }
}
}
impl<C, T, U, const N: usize> Who<U> for Input<C, T, U, N>
where
C: Read<U>,
U: UniqueIdentifier<DataType = T>,
{
}
impl<C, T, U, const N: usize> Display for Input<C, T, U, N>
where
C: Read<U>,
U: UniqueIdentifier<DataType = T>,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "#{:>19}: {}", self.hash, self.who())
}
}
impl<C, T, U, const N: usize> Debug for Input<C, T, U, N>
where
C: Read<U> + Debug,
T: Debug,
U: UniqueIdentifier<DataType = T>,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Input")
.field("rx", &self.rx)
.field("client", &self.client)
.field("hash", &self.hash)
.finish()
}
}
#[async_trait]
pub(crate) trait InputObject: Display + Send + Sync {
async fn recv(&mut self) -> Result<()>;
fn who(&self) -> String;
fn get_hash(&self) -> u64;
fn capacity(&self) -> Option<usize>;
}
impl Debug for Box<dyn InputObject> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Debug::fmt(&std::ops::Deref::deref(&self), f)
}
}
#[async_trait]
impl<C, T, U, const N: usize> InputObject for Input<C, T, U, N>
where
C: Read<U> + Send,
T: Send + Sync,
U: Send + Sync + UniqueIdentifier<DataType = T>,
{
async fn recv(&mut self) -> Result<()> {
log::debug!("{} receiving", Who::highlight(self));
let mut client = self.client.lock().await;
(*client).read(
self.rx
.recv_async()
.await
.map_err(|e| ActorError::DropRecv {
msg: Who::lite(self),
source: e,
})?,
);
log::debug!("{} received ({})", Who::highlight(self), type_name::<C>());
Ok(())
}
fn who(&self) -> String {
Who::who(self)
}
fn get_hash(&self) -> u64 {
self.hash
}
fn capacity(&self) -> Option<usize> {
self.rx.capacity()
}
}