Skip to main content

radicle_protocol/service/
session.rs

1use std::collections::VecDeque;
2use std::{fmt, time};
3
4use crossbeam_channel as chan;
5use radicle::node::{FetchResult, Severity};
6use radicle::node::{Link, Timestamp};
7pub use radicle::node::{PingState, State};
8use radicle::storage::refs::RefsAt;
9
10use crate::service::message;
11use crate::service::message::Message;
12use crate::service::{Address, LocalDuration, LocalTime, NodeId, Outbox, RepoId, Rng};
13
14/// Time after which a connection is considered stable.
15pub const CONNECTION_STABLE_THRESHOLD: LocalDuration = LocalDuration::from_mins(1);
16/// Maximum items in the fetch queue.
17pub const MAX_FETCH_QUEUE_SIZE: usize = 128;
18
19#[derive(thiserror::Error, Debug, Clone, Copy)]
20pub enum Error {
21    /// The remote peer sent an invalid announcement timestamp,
22    /// for eg. a timestamp far in the future.
23    #[error("invalid announcement timestamp: {0}")]
24    InvalidTimestamp(Timestamp),
25    /// The remote peer sent git protocol messages while we were expecting
26    /// gossip messages. Or vice-versa.
27    #[error("protocol mismatch")]
28    ProtocolMismatch,
29    /// The remote peer did something that violates the protocol rules.
30    #[error("peer misbehaved")]
31    Misbehavior,
32    /// The remote peer timed out.
33    #[error("peer timed out")]
34    Timeout,
35}
36
37impl Error {
38    /// Return the severity for this error.
39    pub fn severity(&self) -> Severity {
40        match self {
41            Self::InvalidTimestamp(_) => Severity::High,
42            Self::ProtocolMismatch => Severity::High,
43            Self::Misbehavior => Severity::High,
44            Self::Timeout => Severity::Low,
45        }
46    }
47}
48
49/// Error when trying to queue a fetch.
50#[derive(thiserror::Error, Debug, Clone)]
51pub enum QueueError {
52    /// The item already exists in the queue.
53    #[error("item is already queued")]
54    Duplicate(QueuedFetch),
55    /// The queue is at capacity.
56    #[error("queue capacity reached")]
57    CapacityReached(QueuedFetch),
58}
59
60impl QueueError {
61    /// Get the inner [`QueuedFetch`].
62    pub fn inner(&self) -> &QueuedFetch {
63        match self {
64            Self::Duplicate(f) => f,
65            Self::CapacityReached(f) => f,
66        }
67    }
68}
69
70/// Fetch waiting to be processed, in the fetch queue.
71#[derive(Debug, Clone)]
72pub struct QueuedFetch {
73    /// Repo being fetched.
74    pub rid: RepoId,
75    /// Peer being fetched from.
76    pub from: NodeId,
77    /// Refs being fetched.
78    pub refs_at: Vec<RefsAt>,
79    /// The timeout given for the fetch request.
80    pub timeout: time::Duration,
81    /// Result channel.
82    pub channel: Option<chan::Sender<FetchResult>>,
83}
84
85impl PartialEq for QueuedFetch {
86    fn eq(&self, other: &Self) -> bool {
87        self.rid == other.rid
88            && self.from == other.from
89            && self.refs_at == other.refs_at
90            && self.channel.is_none()
91            && other.channel.is_none()
92    }
93}
94
95/// A peer session. Each connected peer will have one session.
96#[derive(Debug, Clone)]
97pub struct Session {
98    /// Peer id.
99    pub id: NodeId,
100    /// Peer address.
101    pub addr: Address,
102    /// Connection direction.
103    pub link: Link,
104    /// Whether we should attempt to re-connect
105    /// to this peer upon disconnection.
106    pub persistent: bool,
107    /// Peer connection state.
108    pub state: State,
109    /// Peer subscription.
110    pub subscribe: Option<message::Subscribe>,
111    /// Last time a message was received from the peer.
112    pub last_active: LocalTime,
113
114    /// Connection attempts. For persistent peers, Tracks
115    /// how many times we've attempted to connect. We reset this to zero
116    /// upon successful connection, once the connection is stable.
117    attempts: usize,
118    /// Source of entropy.
119    rng: Rng,
120}
121
122impl fmt::Display for Session {
123    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124        let mut attrs = Vec::new();
125        let state = self.state.to_string();
126
127        if self.link.is_inbound() {
128            attrs.push("inbound");
129        } else {
130            attrs.push("outbound");
131        }
132        if self.persistent {
133            attrs.push("persistent");
134        }
135        attrs.push(state.as_str());
136
137        write!(f, "{} [{}]", self.id, attrs.join(" "))
138    }
139}
140
141impl From<&Session> for radicle::node::Session {
142    fn from(s: &Session) -> Self {
143        Self {
144            nid: s.id,
145            link: if s.link.is_inbound() {
146                radicle::node::Link::Inbound
147            } else {
148                radicle::node::Link::Outbound
149            },
150            addr: s.addr.clone(),
151            state: s.state.clone(),
152        }
153    }
154}
155
156impl Session {
157    pub fn outbound(id: NodeId, addr: Address, persistent: bool, rng: Rng) -> Self {
158        Self {
159            id,
160            addr,
161            state: State::Initial,
162            link: Link::Outbound,
163            subscribe: None,
164            persistent,
165            last_active: LocalTime::default(),
166            attempts: 1,
167            rng,
168        }
169    }
170
171    pub fn inbound(id: NodeId, addr: Address, persistent: bool, rng: Rng, time: LocalTime) -> Self {
172        Self {
173            id,
174            addr,
175            state: State::Connected {
176                since: time,
177                ping: PingState::default(),
178                latencies: VecDeque::default(),
179                stable: false,
180            },
181            link: Link::Inbound,
182            subscribe: None,
183            persistent,
184            last_active: time,
185            attempts: 0,
186            rng,
187        }
188    }
189
190    pub fn is_connecting(&self) -> bool {
191        matches!(self.state, State::Attempted)
192    }
193
194    pub fn is_stable(&self) -> bool {
195        matches!(self.state, State::Connected { stable: true, .. })
196    }
197
198    pub fn is_connected(&self) -> bool {
199        self.state.is_connected()
200    }
201
202    pub fn is_disconnected(&self) -> bool {
203        matches!(self.state, State::Disconnected { .. })
204    }
205
206    pub fn is_initial(&self) -> bool {
207        matches!(self.state, State::Initial)
208    }
209
210    pub fn attempts(&self) -> usize {
211        self.attempts
212    }
213
214    /// Run 'idle' task for session.
215    pub fn idle(&mut self, now: LocalTime) {
216        if let State::Connected {
217            since,
218            ref mut stable,
219            ..
220        } = self.state
221        {
222            if now >= since && now.duration_since(since) >= CONNECTION_STABLE_THRESHOLD {
223                *stable = true;
224                // Reset number of attempts for stable connections.
225                self.attempts = 0;
226            }
227        }
228    }
229
230    pub fn to_attempted(&mut self) {
231        assert!(
232            self.is_initial(),
233            "Can only transition to 'attempted' state from 'initial' state"
234        );
235        self.state = State::Attempted;
236        self.attempts += 1;
237    }
238
239    pub fn to_connected(&mut self, since: LocalTime) {
240        self.last_active = since;
241
242        if let State::Connected { .. } = &self.state {
243            log::debug!(target: "service", "Session {} is already in 'connected' state, resetting..", self.id);
244        };
245        self.state = State::Connected {
246            since,
247            ping: PingState::default(),
248            latencies: VecDeque::default(),
249            stable: false,
250        };
251    }
252
253    /// Move the session state to "disconnected". Returns any pending RID
254    /// that was requested.
255    pub fn to_disconnected(&mut self, since: LocalTime, retry_at: LocalTime) {
256        self.state = State::Disconnected { since, retry_at };
257    }
258
259    /// Return to initial state from disconnected state. This state transition
260    /// happens when we attempt to re-connect to a disconnected peer.
261    pub fn to_initial(&mut self) {
262        assert!(
263            self.is_disconnected(),
264            "Can only transition to 'initial' state from 'disconnected' state"
265        );
266        self.state = State::Initial;
267    }
268
269    pub fn ping(&mut self, since: LocalTime, reactor: &mut Outbox) -> Result<(), Error> {
270        if let State::Connected { ping, .. } = &mut self.state {
271            let msg = message::Ping::new(&mut self.rng);
272            *ping = PingState::AwaitingResponse {
273                len: msg.ponglen,
274                since,
275            };
276            reactor.write(self, Message::Ping(msg));
277        }
278        Ok(())
279    }
280}