radicle_node/service/
session.rs

1use std::collections::{HashSet, VecDeque};
2use std::{fmt, time};
3
4use crossbeam_channel as chan;
5
6use crate::node::config::Limits;
7use crate::node::{FetchResult, Severity};
8use crate::service::message;
9use crate::service::message::Message;
10use crate::service::{Address, LocalDuration, LocalTime, NodeId, Outbox, RepoId, Rng};
11use crate::storage::refs::RefsAt;
12use crate::{Link, Timestamp};
13
14pub use crate::node::{PingState, State};
15
16/// Time after which a connection is considered stable.
17pub const CONNECTION_STABLE_THRESHOLD: LocalDuration = LocalDuration::from_mins(1);
18/// Maximum items in the fetch queue.
19pub const MAX_FETCH_QUEUE_SIZE: usize = 128;
20
21#[derive(thiserror::Error, Debug, Clone, Copy)]
22pub enum Error {
23    /// The remote peer sent an invalid announcement timestamp,
24    /// for eg. a timestamp far in the future.
25    #[error("invalid announcement timestamp: {0}")]
26    InvalidTimestamp(Timestamp),
27    /// The remote peer sent git protocol messages while we were expecting
28    /// gossip messages. Or vice-versa.
29    #[error("protocol mismatch")]
30    ProtocolMismatch,
31    /// The remote peer did something that violates the protocol rules.
32    #[error("peer misbehaved")]
33    Misbehavior,
34    /// The remote peer timed out.
35    #[error("peer timed out")]
36    Timeout,
37}
38
39impl Error {
40    /// Return the severity for this error.
41    pub fn severity(&self) -> Severity {
42        match self {
43            Self::InvalidTimestamp(_) => Severity::High,
44            Self::ProtocolMismatch => Severity::High,
45            Self::Misbehavior => Severity::High,
46            Self::Timeout => Severity::Low,
47        }
48    }
49}
50
51/// Error when trying to queue a fetch.
52#[derive(thiserror::Error, Debug, Clone)]
53pub enum QueueError {
54    /// The item already exists in the queue.
55    #[error("item is already queued")]
56    Duplicate(QueuedFetch),
57    /// The queue is at capacity.
58    #[error("queue capacity reached")]
59    CapacityReached(QueuedFetch),
60}
61
62impl QueueError {
63    /// Get the inner [`QueuedFetch`].
64    pub fn inner(&self) -> &QueuedFetch {
65        match self {
66            Self::Duplicate(f) => f,
67            Self::CapacityReached(f) => f,
68        }
69    }
70}
71
72/// Fetch waiting to be processed, in the fetch queue.
73#[derive(Debug, Clone)]
74pub struct QueuedFetch {
75    /// Repo being fetched.
76    pub rid: RepoId,
77    /// Peer being fetched from.
78    pub from: NodeId,
79    /// Refs being fetched.
80    pub refs_at: Vec<RefsAt>,
81    /// The timeout given for the fetch request.
82    pub timeout: time::Duration,
83    /// Result channel.
84    pub channel: Option<chan::Sender<FetchResult>>,
85}
86
87impl PartialEq for QueuedFetch {
88    fn eq(&self, other: &Self) -> bool {
89        self.rid == other.rid
90            && self.from == other.from
91            && self.refs_at == other.refs_at
92            && self.channel.is_none()
93            && other.channel.is_none()
94    }
95}
96
97/// A peer session. Each connected peer will have one session.
98#[derive(Debug, Clone)]
99pub struct Session {
100    /// Peer id.
101    pub id: NodeId,
102    /// Peer address.
103    pub addr: Address,
104    /// Connection direction.
105    pub link: Link,
106    /// Whether we should attempt to re-connect
107    /// to this peer upon disconnection.
108    pub persistent: bool,
109    /// Peer connection state.
110    pub state: State,
111    /// Peer subscription.
112    pub subscribe: Option<message::Subscribe>,
113    /// Last time a message was received from the peer.
114    pub last_active: LocalTime,
115    /// Fetch queue.
116    pub queue: VecDeque<QueuedFetch>,
117
118    /// Connection attempts. For persistent peers, Tracks
119    /// how many times we've attempted to connect. We reset this to zero
120    /// upon successful connection, once the connection is stable.
121    attempts: usize,
122    /// Source of entropy.
123    rng: Rng,
124    /// Protocol limits.
125    limits: Limits,
126}
127
128impl fmt::Display for Session {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        let mut attrs = Vec::new();
131        let state = self.state.to_string();
132
133        if self.link.is_inbound() {
134            attrs.push("inbound");
135        } else {
136            attrs.push("outbound");
137        }
138        if self.persistent {
139            attrs.push("persistent");
140        }
141        attrs.push(state.as_str());
142
143        write!(f, "{} [{}]", self.id, attrs.join(" "))
144    }
145}
146
147impl From<&Session> for radicle::node::Session {
148    fn from(s: &Session) -> Self {
149        Self {
150            nid: s.id,
151            link: if s.link.is_inbound() {
152                radicle::node::Link::Inbound
153            } else {
154                radicle::node::Link::Outbound
155            },
156            addr: s.addr.clone(),
157            state: s.state.clone(),
158        }
159    }
160}
161
162impl Session {
163    pub fn outbound(id: NodeId, addr: Address, persistent: bool, rng: Rng, limits: Limits) -> Self {
164        Self {
165            id,
166            addr,
167            state: State::Initial,
168            link: Link::Outbound,
169            subscribe: None,
170            persistent,
171            last_active: LocalTime::default(),
172            queue: VecDeque::with_capacity(MAX_FETCH_QUEUE_SIZE),
173            attempts: 1,
174            rng,
175            limits,
176        }
177    }
178
179    pub fn inbound(
180        id: NodeId,
181        addr: Address,
182        persistent: bool,
183        rng: Rng,
184        time: LocalTime,
185        limits: Limits,
186    ) -> Self {
187        Self {
188            id,
189            addr,
190            state: State::Connected {
191                since: time,
192                ping: PingState::default(),
193                fetching: HashSet::default(),
194                latencies: VecDeque::default(),
195                stable: false,
196            },
197            link: Link::Inbound,
198            subscribe: None,
199            persistent,
200            last_active: time,
201            queue: VecDeque::new(),
202            attempts: 0,
203            rng,
204            limits,
205        }
206    }
207
208    pub fn is_connecting(&self) -> bool {
209        matches!(self.state, State::Attempted { .. })
210    }
211
212    pub fn is_stable(&self) -> bool {
213        matches!(self.state, State::Connected { stable: true, .. })
214    }
215
216    pub fn is_connected(&self) -> bool {
217        self.state.is_connected()
218    }
219
220    pub fn is_disconnected(&self) -> bool {
221        matches!(self.state, State::Disconnected { .. })
222    }
223
224    pub fn is_initial(&self) -> bool {
225        matches!(self.state, State::Initial)
226    }
227
228    pub fn is_at_capacity(&self) -> bool {
229        if let State::Connected { fetching, .. } = &self.state {
230            if fetching.len() >= self.limits.fetch_concurrency {
231                return true;
232            }
233        }
234        false
235    }
236
237    pub fn is_fetching(&self, rid: &RepoId) -> bool {
238        if let State::Connected { fetching, .. } = &self.state {
239            return fetching.contains(rid);
240        }
241        false
242    }
243
244    /// Queue a fetch. Returns `true` if it was added to the queue, and `false` if
245    /// it already was present in the queue.
246    pub fn queue_fetch(&mut self, fetch: QueuedFetch) -> Result<(), QueueError> {
247        assert_eq!(fetch.from, self.id);
248
249        if self.queue.len() >= MAX_FETCH_QUEUE_SIZE {
250            return Err(QueueError::CapacityReached(fetch));
251        } else if self.queue.contains(&fetch) {
252            return Err(QueueError::Duplicate(fetch));
253        }
254        self.queue.push_back(fetch);
255
256        Ok(())
257    }
258
259    pub fn dequeue_fetch(&mut self) -> Option<QueuedFetch> {
260        self.queue.pop_front()
261    }
262
263    pub fn attempts(&self) -> usize {
264        self.attempts
265    }
266
267    /// Run 'idle' task for session.
268    pub fn idle(&mut self, now: LocalTime) {
269        if let State::Connected {
270            since,
271            ref mut stable,
272            ..
273        } = self.state
274        {
275            if now >= since && now.duration_since(since) >= CONNECTION_STABLE_THRESHOLD {
276                *stable = true;
277                // Reset number of attempts for stable connections.
278                self.attempts = 0;
279            }
280        }
281    }
282
283    /// Mark this session as fetching the given RID.
284    ///
285    /// # Panics
286    ///
287    /// If it is already fetching that RID, or the session is disconnected.
288    pub fn fetching(&mut self, rid: RepoId) {
289        if let State::Connected { fetching, .. } = &mut self.state {
290            assert!(
291                fetching.insert(rid),
292                "Session must not already be fetching {rid}"
293            );
294        } else {
295            panic!(
296                "Attempting to fetch {rid} from disconnected session {}",
297                self.id
298            );
299        }
300    }
301
302    pub fn fetched(&mut self, rid: RepoId) {
303        if let State::Connected { fetching, .. } = &mut self.state {
304            if !fetching.remove(&rid) {
305                log::warn!(target: "service", "Fetched unknown repository {rid}");
306            }
307        }
308    }
309
310    pub fn to_attempted(&mut self) {
311        assert!(
312            self.is_initial(),
313            "Can only transition to 'attempted' state from 'initial' state"
314        );
315        self.state = State::Attempted;
316        self.attempts += 1;
317    }
318
319    pub fn to_connected(&mut self, since: LocalTime) {
320        self.last_active = since;
321
322        if let State::Connected { .. } = &self.state {
323            log::error!(target: "service", "Session {} is already in 'connected' state, resetting..", self.id);
324        };
325        self.state = State::Connected {
326            since,
327            ping: PingState::default(),
328            fetching: HashSet::default(),
329            latencies: VecDeque::default(),
330            stable: false,
331        };
332    }
333
334    /// Move the session state to "disconnected". Returns any pending RID
335    /// that was requested.
336    pub fn to_disconnected(&mut self, since: LocalTime, retry_at: LocalTime) {
337        self.state = State::Disconnected { since, retry_at };
338    }
339
340    /// Return to initial state from disconnected state. This state transition
341    /// happens when we attempt to re-connect to a disconnected peer.
342    pub fn to_initial(&mut self) {
343        assert!(
344            self.is_disconnected(),
345            "Can only transition to 'initial' state from 'disconnected' state"
346        );
347        self.state = State::Initial;
348    }
349
350    pub fn ping(&mut self, since: LocalTime, reactor: &mut Outbox) -> Result<(), Error> {
351        if let State::Connected { ping, .. } = &mut self.state {
352            let msg = message::Ping::new(&mut self.rng);
353            *ping = PingState::AwaitingResponse {
354                len: msg.ponglen,
355                since,
356            };
357            reactor.write(self, Message::Ping(msg));
358        }
359        Ok(())
360    }
361}