reactive_messaging/socket_connection/
peer.rs

1//! Resting place for [Peer], representing the remote party on a TCP/IP socket connection
2
3use crate::{
4    serde::ReactiveMessagingSerializer,
5    socket_connection::common::{
6        ReactiveMessagingSender,
7    },
8};
9use std::{
10    fmt::{Debug, Formatter},
11    net::SocketAddr,
12    time::Duration,
13};
14use reactive_mutiny::prelude::advanced::{
15    MutinyStream,
16    FullDuplexUniChannel,
17};
18use tokio::sync::Mutex;
19use crate::config::ConstConfig;
20use crate::socket_connection::connection::{ConnectionId, SocketConnection};
21
22
23/// Represents a reactive channel connected to a remote peer, through which we're able to send out "local messages" of type `RetryableSenderImpl::LocalMessages`.\
24/// the [Self::send()] method honors whatever retrying config is specified in [RetryableSenderImpl::CONST_CONFIG].
25/// IMPLEMENTATION NOTE: GAT traits (to reduce the number of generic parameters) couldn't be used here -- even after applying this compiler bug workaround https://github.com/rust-lang/rust/issues/102211#issuecomment-1513931928
26///                      -- the "error: implementation of `std::marker::Send` is not general enough" bug kept on popping up in user provided closures that called other async functions.
27pub struct Peer<const CONFIG:     u64,
28                LocalMessages:    ReactiveMessagingSerializer<LocalMessages>                                  + Send + Sync + PartialEq + Debug + 'static,
29                SenderChannel:    FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
30                StateType:                                                                                      Send + Sync + Clone     + Debug + 'static = ()> {
31    // TODO rename to `id`
32    pub peer_id:          ConnectionId,
33    pub peer_address:     SocketAddr,
34    pub state:            Mutex<Option<StateType>>,
35        retryable_sender: ReactiveMessagingSender<CONFIG, LocalMessages, SenderChannel>,
36}
37
38impl<const CONFIG:  u64,
39     LocalMessages: ReactiveMessagingSerializer<LocalMessages>                                  + Send + Sync + PartialEq + Debug,
40     SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
41     StateType:                                                                                   Send + Sync + Clone     + Debug>
42Peer<CONFIG, LocalMessages, SenderChannel, StateType> {
43
44    pub fn new(retryable_sender: ReactiveMessagingSender<CONFIG, LocalMessages, SenderChannel>, peer_address: SocketAddr, connection: &SocketConnection<StateType>) -> Self {
45        Self {
46            peer_id: connection.id(),
47            peer_address,
48            state: Mutex::new(Some(connection.state().clone())),
49            retryable_sender,
50        }
51    }
52
53    pub fn config(&self) -> ConstConfig {
54        ConstConfig::from(CONFIG)
55    }
56
57    /// Asks the underlying channel to revert to Stream-mode (rather than Execution-mode), returning the `Stream`
58    pub fn create_stream(&self) -> (MutinyStream<'static, LocalMessages, SenderChannel, LocalMessages>, u32) {
59        self.retryable_sender.create_stream()
60    }
61
62
63    #[inline(always)]
64    pub fn send(&self,
65                message: LocalMessages)
66               -> Result<(), (/*abort_the_connection?*/bool, /*error_message: */String)> {
67        self.retryable_sender.send(message)
68    }
69
70    #[inline(always)]
71    pub async fn send_async(&self,
72                            message: LocalMessages)
73                           -> Result<(), (/*abort_the_connection?*/bool, /*error_message: */String)> {
74
75        self.retryable_sender.send_async_trait(message).await
76    }
77
78    /// Sets this object to a user-provided state, to facilitate communications between protocol processors
79    /// (a requirement to allow the "Composite Protocol Stacking" pattern).\
80    /// See also [Self::take_state()]
81    pub async fn set_state(&self, state: StateType) {
82        *self.state.lock().await = Some(state);
83    }
84
85    /// Use [set_state()] (async) if possible
86    pub fn try_set_state(&self, state: StateType) -> bool {
87        if let Ok(mut locked_state) = self.state.try_lock() {
88            *locked_state = Some(state);
89            true
90        } else {
91            false
92        }
93    }
94
95    /// "Takes" this object's user-provided state, previously set by [Self::set_state()] -- used to facilitate communications between protocol processors
96    /// (a requirement to allow the "Composite Protocol Stacking" pattern)
97    pub async fn take_state(&self) -> Option<StateType> {
98        self.state.lock().await.take()
99    }
100
101    /// Use [Self::take_state()] (async) if possible
102    pub fn try_take_state(&self) -> Option<Option<StateType>> {
103        if let Ok(mut locked_state) = self.state.try_lock() {
104            Some(locked_state.take())
105        } else {
106            None
107        }
108    }
109
110    #[inline(always)]
111    pub fn pending_items_count(&self) -> u32 {
112        self.retryable_sender.pending_items_count()
113    }
114
115    #[inline(always)]
116    pub fn buffer_size(&self) -> u32 {
117        self.retryable_sender.buffer_size()
118    }
119
120    pub async fn flush_and_close(&self, timeout: Duration) -> u32 {
121        self.retryable_sender.flush_and_close(timeout).await
122    }
123
124    pub fn cancel_and_close(&self) {
125        self.retryable_sender.cancel_and_close();
126    }
127}
128
129
130impl<const CONFIG:  u64,
131     LocalMessages: ReactiveMessagingSerializer<LocalMessages>                                  + Send + Sync + PartialEq + Debug,
132     SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
133     StateType:                                                                                   Send + Sync + Clone     + Debug>
134Debug for
135Peer<CONFIG, LocalMessages, SenderChannel, StateType> {
136    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
137        write!(f, "Peer {{peer_id: {}, peer_address: '{}', state: '{:?}', sender: {}/{} pending messages}}",
138               self.peer_id, self.peer_address, self.state.try_lock(), self.retryable_sender.pending_items_count(), self.retryable_sender.buffer_size())
139    }
140}