reactive_messaging/socket_connection/
peer.rs1use crate::{
4 serde::ReactiveMessagingSerializer,
5 socket_connection::common::{
6 ReactiveMessagingSender,
7 },
8};
9use std::{
10 fmt::{Debug, Formatter},
11 net::SocketAddr,
12 time::Duration,
13};
14use reactive_mutiny::prelude::advanced::{
15 MutinyStream,
16 FullDuplexUniChannel,
17};
18use tokio::sync::Mutex;
19use crate::config::ConstConfig;
20use crate::socket_connection::connection::{ConnectionId, SocketConnection};
21
22
23pub struct Peer<const CONFIG: u64,
28 LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug + 'static,
29 SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
30 StateType: Send + Sync + Clone + Debug + 'static = ()> {
31 pub peer_id: ConnectionId,
33 pub peer_address: SocketAddr,
34 pub state: Mutex<Option<StateType>>,
35 retryable_sender: ReactiveMessagingSender<CONFIG, LocalMessages, SenderChannel>,
36}
37
38impl<const CONFIG: u64,
39 LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug,
40 SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
41 StateType: Send + Sync + Clone + Debug>
42Peer<CONFIG, LocalMessages, SenderChannel, StateType> {
43
44 pub fn new(retryable_sender: ReactiveMessagingSender<CONFIG, LocalMessages, SenderChannel>, peer_address: SocketAddr, connection: &SocketConnection<StateType>) -> Self {
45 Self {
46 peer_id: connection.id(),
47 peer_address,
48 state: Mutex::new(Some(connection.state().clone())),
49 retryable_sender,
50 }
51 }
52
53 pub fn config(&self) -> ConstConfig {
54 ConstConfig::from(CONFIG)
55 }
56
57 pub fn create_stream(&self) -> (MutinyStream<'static, LocalMessages, SenderChannel, LocalMessages>, u32) {
59 self.retryable_sender.create_stream()
60 }
61
62
63 #[inline(always)]
64 pub fn send(&self,
65 message: LocalMessages)
66 -> Result<(), (bool, String)> {
67 self.retryable_sender.send(message)
68 }
69
70 #[inline(always)]
71 pub async fn send_async(&self,
72 message: LocalMessages)
73 -> Result<(), (bool, String)> {
74
75 self.retryable_sender.send_async_trait(message).await
76 }
77
78 pub async fn set_state(&self, state: StateType) {
82 *self.state.lock().await = Some(state);
83 }
84
85 pub fn try_set_state(&self, state: StateType) -> bool {
87 if let Ok(mut locked_state) = self.state.try_lock() {
88 *locked_state = Some(state);
89 true
90 } else {
91 false
92 }
93 }
94
95 pub async fn take_state(&self) -> Option<StateType> {
98 self.state.lock().await.take()
99 }
100
101 pub fn try_take_state(&self) -> Option<Option<StateType>> {
103 if let Ok(mut locked_state) = self.state.try_lock() {
104 Some(locked_state.take())
105 } else {
106 None
107 }
108 }
109
110 #[inline(always)]
111 pub fn pending_items_count(&self) -> u32 {
112 self.retryable_sender.pending_items_count()
113 }
114
115 #[inline(always)]
116 pub fn buffer_size(&self) -> u32 {
117 self.retryable_sender.buffer_size()
118 }
119
120 pub async fn flush_and_close(&self, timeout: Duration) -> u32 {
121 self.retryable_sender.flush_and_close(timeout).await
122 }
123
124 pub fn cancel_and_close(&self) {
125 self.retryable_sender.cancel_and_close();
126 }
127}
128
129
130impl<const CONFIG: u64,
131 LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug,
132 SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
133 StateType: Send + Sync + Clone + Debug>
134Debug for
135Peer<CONFIG, LocalMessages, SenderChannel, StateType> {
136 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
137 write!(f, "Peer {{peer_id: {}, peer_address: '{}', state: '{:?}', sender: {}/{} pending messages}}",
138 self.peer_id, self.peer_address, self.state.try_lock(), self.retryable_sender.pending_items_count(), self.retryable_sender.buffer_size())
139 }
140}