1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
//! Resting place for [Peer], representing the remote party on a TCP/IP socket connection

use crate::{
    serde::ReactiveMessagingSerializer,
    socket_connection::common::{
        ReactiveMessagingSender,
    },
};
use std::{
    fmt::{Debug, Formatter},
    net::SocketAddr,
    time::Duration,
};
use reactive_mutiny::prelude::advanced::{
    MutinyStream,
    FullDuplexUniChannel,
};
use tokio::sync::Mutex;
use crate::socket_connection::connection::{ConnectionId, SocketConnection};


/// Represents a reactive channel connected to a remote peer, through which we're able to send out "local messages" of type `RetryableSenderImpl::LocalMessages`.\
/// the [Self::send()] method honors whatever retrying config is specified in [RetryableSenderImpl::CONST_CONFIG].
/// IMPLEMENTATION NOTE: GAT traits (to reduce the number of generic parameters) couldn't be used here -- even after applying this compiler bug workaround https://github.com/rust-lang/rust/issues/102211#issuecomment-1513931928
///                      -- the "error: implementation of `std::marker::Send` is not general enough" bug kept on popping up in user provided closures that called other async functions.
pub struct Peer<const CONFIG:     u64,
                LocalMessages:    ReactiveMessagingSerializer<LocalMessages>                                  + Send + Sync + PartialEq + Debug + 'static,
                SenderChannel:    FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
                StateType:                                                                                      Send + Sync + Clone     + Debug + 'static = ()> {
    // TODO rename to `id`
    pub peer_id:          ConnectionId,
    pub peer_address:     SocketAddr,
    pub state:            Mutex<Option<StateType>>,
        retryable_sender: ReactiveMessagingSender<CONFIG, LocalMessages, SenderChannel>,
}

impl<const CONFIG:  u64,
     LocalMessages: ReactiveMessagingSerializer<LocalMessages>                                  + Send + Sync + PartialEq + Debug,
     SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
     StateType:                                                                                   Send + Sync + Clone     + Debug>
Peer<CONFIG, LocalMessages, SenderChannel, StateType> {

    pub fn new(retryable_sender: ReactiveMessagingSender<CONFIG, LocalMessages, SenderChannel>, peer_address: SocketAddr, connection: &SocketConnection<StateType>) -> Self {
        Self {
            peer_id: connection.id(),
            peer_address,
            state: Mutex::new(Some(connection.state().clone())),
            retryable_sender,
        }
    }

    /// Asks the underlying channel to revert to Stream-mode (rather than Execution-mode), returning the `Stream`
    pub fn create_stream(&self) -> (MutinyStream<'static, LocalMessages, SenderChannel, LocalMessages>, u32) {
        self.retryable_sender.create_stream()
    }


    #[inline(always)]
    pub fn send(&self,
                message: LocalMessages)
               -> Result<(), (/*abort_the_connection?*/bool, /*error_message: */String)> {
        self.retryable_sender.send(message)
    }

    #[inline(always)]
    pub async fn send_async(&self,
                            message: LocalMessages)
                           -> Result<(), (/*abort_the_connection?*/bool, /*error_message: */String)> {

        self.retryable_sender.send_async_trait(message).await
    }

    /// Sets this object to a user-provided state, to facilitate communications between protocol processors
    /// (a requirement to allow the "Composite Protocol Stacking" pattern).\
    /// See also [Self::take_state()]
    pub async fn set_state(&self, state: StateType) {
        *self.state.lock().await = Some(state);
    }

    /// Use [set_state()] (async) if possible
    pub fn try_set_state(&self, state: StateType) -> bool {
        if let Ok(mut locked_state) = self.state.try_lock() {
            *locked_state = Some(state);
            true
        } else {
            false
        }
    }

    /// "Takes" this object's user-provided state, previously set by [Self::set_state()] -- used to facilitate communications between protocol processors
    /// (a requirement to allow the "Composite Protocol Stacking" pattern)
    pub async fn take_state(&self) -> Option<StateType> {
        self.state.lock().await.take()
    }

    /// Use [Self::take_state()] (async) if possible
    pub fn try_take_state(&self) -> Option<Option<StateType>> {
        if let Ok(mut locked_state) = self.state.try_lock() {
            Some(locked_state.take())
        } else {
            None
        }
    }

    #[inline(always)]
    pub fn pending_items_count(&self) -> u32 {
        self.retryable_sender.pending_items_count()
    }

    #[inline(always)]
    pub fn buffer_size(&self) -> u32 {
        self.retryable_sender.buffer_size()
    }

    pub async fn flush_and_close(&self, timeout: Duration) -> u32 {
        self.retryable_sender.flush_and_close(timeout).await
    }

    pub fn cancel_and_close(&self) {
        self.retryable_sender.cancel_and_close();
    }
}


impl<const CONFIG:  u64,
     LocalMessages: ReactiveMessagingSerializer<LocalMessages>                                  + Send + Sync + PartialEq + Debug,
     SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
     StateType:                                                                                   Send + Sync + Clone     + Debug>
Debug for
Peer<CONFIG, LocalMessages, SenderChannel, StateType> {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        write!(f, "Peer {{peer_id: {}, peer_address: '{}', state: '{:?}', sender: {}/{} pending messages}}",
               self.peer_id, self.peer_address, self.state.try_lock(), self.retryable_sender.pending_items_count(), self.retryable_sender.buffer_size())
    }
}