use crate::{
socket_connection::common::{
ReactiveMessagingSender,
},
};
use std::{
fmt::{Debug, Formatter},
net::SocketAddr,
time::Duration,
};
use reactive_mutiny::prelude::advanced::{
MutinyStream,
FullDuplexUniChannel,
};
use tokio::sync::Mutex;
use crate::config::ConstConfig;
use crate::socket_connection::connection::{ConnectionId, SocketConnection};
pub struct Peer<const CONFIG: u64,
LocalMessages: Send + Sync + PartialEq + Debug + 'static,
SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
StateType: Send + Sync + Clone + Debug + 'static = ()> {
pub peer_id: ConnectionId,
pub peer_address: SocketAddr,
pub state: Mutex<Option<StateType>>,
retryable_sender: ReactiveMessagingSender<CONFIG, LocalMessages, SenderChannel>,
}
impl<const CONFIG: u64,
LocalMessages: Send + Sync + PartialEq + Debug,
SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
StateType: Send + Sync + Clone + Debug>
Peer<CONFIG, LocalMessages, SenderChannel, StateType> {
pub fn new(retryable_sender: ReactiveMessagingSender<CONFIG, LocalMessages, SenderChannel>, peer_address: SocketAddr, connection: &SocketConnection<StateType>) -> Self {
Self {
peer_id: connection.id(),
peer_address,
state: Mutex::new(Some(connection.state().clone())),
retryable_sender,
}
}
pub fn config(&self) -> ConstConfig {
ConstConfig::from(CONFIG)
}
pub fn create_stream(&self) -> (MutinyStream<'static, LocalMessages, SenderChannel, LocalMessages>, u32) {
self.retryable_sender.create_stream()
}
#[inline(always)]
pub fn send(&self,
message: LocalMessages)
-> Result<(), (/*abort_the_connection?*/bool, /*error_message: */String)> {
self.retryable_sender.send(message)
}
#[inline(always)]
pub async fn send_async(&self,
message: LocalMessages)
-> Result<(), (/*abort_the_connection?*/bool, /*error_message: */String)> {
self.retryable_sender.send_async_trait(message).await
}
pub async fn set_state(&self, state: StateType) {
*self.state.lock().await = Some(state);
}
pub fn blocking_set_state(&self, state: StateType) {
loop {
if let Ok(mut locked_state) = self.state.try_lock() {
locked_state.replace(state);
break
} else {
std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop();
std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop();
std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop();
std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop();
std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop();
}
}
}
#[must_use = "This operation is not guaranteed to succeed. You must retry or propagate the status. If you can't use async `set_state()`, use `blocking_set_state()` instead."]
pub fn try_set_state(&self, state: StateType) -> bool {
if let Ok(mut locked_state) = self.state.try_lock() {
locked_state.replace(state);
true
} else {
false
}
}
pub async fn take_state(&self) -> Option<StateType> {
self.state.lock().await.take()
}
pub fn try_take_state(&self) -> Option<Option<StateType>> {
if let Ok(mut locked_state) = self.state.try_lock() {
Some(locked_state.take())
} else {
None
}
}
#[inline(always)]
pub fn pending_items_count(&self) -> u32 {
self.retryable_sender.pending_items_count()
}
#[inline(always)]
pub fn buffer_size(&self) -> u32 {
self.retryable_sender.buffer_size()
}
pub async fn flush_and_close(&self, timeout: Duration) -> u32 {
self.retryable_sender.flush_and_close(timeout).await
}
pub fn cancel_and_close(&self) {
self.retryable_sender.cancel_and_close();
}
}
impl<const CONFIG: u64,
LocalMessages: Send + Sync + PartialEq + Debug,
SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
StateType: Send + Sync + Clone + Debug>
Debug for
Peer<CONFIG, LocalMessages, SenderChannel, StateType> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Peer {{peer_id: {}, peer_address: '{}', state: '{:?}', sender: {}/{} pending messages}}",
self.peer_id, self.peer_address, self.state.try_lock(), self.retryable_sender.pending_items_count(), self.retryable_sender.buffer_size())
}
}