use crate::{
serde::ReactiveMessagingSerializer,
socket_connection::common::{
ReactiveMessagingSender,
},
};
use std::{
fmt::{Debug, Formatter},
net::SocketAddr,
time::Duration,
};
use reactive_mutiny::prelude::advanced::{
MutinyStream,
FullDuplexUniChannel,
};
use tokio::sync::Mutex;
use crate::socket_connection::connection::{ConnectionId, SocketConnection};
pub struct Peer<const CONFIG: u64,
LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug + 'static,
SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
StateType: Send + Sync + Clone + Debug + 'static = ()> {
pub peer_id: ConnectionId,
pub peer_address: SocketAddr,
pub state: Mutex<Option<StateType>>,
retryable_sender: ReactiveMessagingSender<CONFIG, LocalMessages, SenderChannel>,
}
impl<const CONFIG: u64,
LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug,
SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
StateType: Send + Sync + Clone + Debug>
Peer<CONFIG, LocalMessages, SenderChannel, StateType> {
pub fn new(retryable_sender: ReactiveMessagingSender<CONFIG, LocalMessages, SenderChannel>, peer_address: SocketAddr, connection: &SocketConnection<StateType>) -> Self {
Self {
peer_id: connection.id(),
peer_address,
state: Mutex::new(Some(connection.state().clone())),
retryable_sender,
}
}
pub fn create_stream(&self) -> (MutinyStream<'static, LocalMessages, SenderChannel, LocalMessages>, u32) {
self.retryable_sender.create_stream()
}
#[inline(always)]
pub fn send(&self,
message: LocalMessages)
-> Result<(), (bool, String)> {
self.retryable_sender.send(message)
}
#[inline(always)]
pub async fn send_async(&self,
message: LocalMessages)
-> Result<(), (bool, String)> {
self.retryable_sender.send_async_trait(message).await
}
pub async fn set_state(&self, state: StateType) {
*self.state.lock().await = Some(state);
}
pub fn try_set_state(&self, state: StateType) -> bool {
if let Ok(mut locked_state) = self.state.try_lock() {
*locked_state = Some(state);
true
} else {
false
}
}
pub async fn take_state(&self) -> Option<StateType> {
self.state.lock().await.take()
}
pub fn try_take_state(&self) -> Option<Option<StateType>> {
if let Ok(mut locked_state) = self.state.try_lock() {
Some(locked_state.take())
} else {
None
}
}
#[inline(always)]
pub fn pending_items_count(&self) -> u32 {
self.retryable_sender.pending_items_count()
}
#[inline(always)]
pub fn buffer_size(&self) -> u32 {
self.retryable_sender.buffer_size()
}
pub async fn flush_and_close(&self, timeout: Duration) -> u32 {
self.retryable_sender.flush_and_close(timeout).await
}
pub fn cancel_and_close(&self) {
self.retryable_sender.cancel_and_close();
}
}
impl<const CONFIG: u64,
LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug,
SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
StateType: Send + Sync + Clone + Debug>
Debug for
Peer<CONFIG, LocalMessages, SenderChannel, StateType> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Peer {{peer_id: {}, peer_address: '{}', state: '{:?}', sender: {}/{} pending messages}}",
self.peer_id, self.peer_address, self.state.try_lock(), self.retryable_sender.pending_items_count(), self.retryable_sender.buffer_size())
}
}