radicle_protocol/service/
session.rs

1use std::collections::{HashSet, VecDeque};
2use std::{fmt, time};
3
4use crossbeam_channel as chan;
5use radicle::node::config::Limits;
6use radicle::node::{FetchResult, Severity};
7use radicle::node::{Link, Timestamp};
8pub use radicle::node::{PingState, State};
9use radicle::storage::refs::RefsAt;
10
11use crate::service::message;
12use crate::service::message::Message;
13use crate::service::{Address, LocalDuration, LocalTime, NodeId, Outbox, RepoId, Rng};
14
15/// Time after which a connection is considered stable.
16pub const CONNECTION_STABLE_THRESHOLD: LocalDuration = LocalDuration::from_mins(1);
17/// Maximum items in the fetch queue.
18pub const MAX_FETCH_QUEUE_SIZE: usize = 128;
19
20#[derive(thiserror::Error, Debug, Clone, Copy)]
21pub enum Error {
22    /// The remote peer sent an invalid announcement timestamp,
23    /// for eg. a timestamp far in the future.
24    #[error("invalid announcement timestamp: {0}")]
25    InvalidTimestamp(Timestamp),
26    /// The remote peer sent git protocol messages while we were expecting
27    /// gossip messages. Or vice-versa.
28    #[error("protocol mismatch")]
29    ProtocolMismatch,
30    /// The remote peer did something that violates the protocol rules.
31    #[error("peer misbehaved")]
32    Misbehavior,
33    /// The remote peer timed out.
34    #[error("peer timed out")]
35    Timeout,
36}
37
38impl Error {
39    /// Return the severity for this error.
40    pub fn severity(&self) -> Severity {
41        match self {
42            Self::InvalidTimestamp(_) => Severity::High,
43            Self::ProtocolMismatch => Severity::High,
44            Self::Misbehavior => Severity::High,
45            Self::Timeout => Severity::Low,
46        }
47    }
48}
49
50/// Error when trying to queue a fetch.
51#[derive(thiserror::Error, Debug, Clone)]
52pub enum QueueError {
53    /// The item already exists in the queue.
54    #[error("item is already queued")]
55    Duplicate(QueuedFetch),
56    /// The queue is at capacity.
57    #[error("queue capacity reached")]
58    CapacityReached(QueuedFetch),
59}
60
61impl QueueError {
62    /// Get the inner [`QueuedFetch`].
63    pub fn inner(&self) -> &QueuedFetch {
64        match self {
65            Self::Duplicate(f) => f,
66            Self::CapacityReached(f) => f,
67        }
68    }
69}
70
71/// Fetch waiting to be processed, in the fetch queue.
72#[derive(Debug, Clone)]
73pub struct QueuedFetch {
74    /// Repo being fetched.
75    pub rid: RepoId,
76    /// Peer being fetched from.
77    pub from: NodeId,
78    /// Refs being fetched.
79    pub refs_at: Vec<RefsAt>,
80    /// The timeout given for the fetch request.
81    pub timeout: time::Duration,
82    /// Result channel.
83    pub channel: Option<chan::Sender<FetchResult>>,
84}
85
86impl PartialEq for QueuedFetch {
87    fn eq(&self, other: &Self) -> bool {
88        self.rid == other.rid
89            && self.from == other.from
90            && self.refs_at == other.refs_at
91            && self.channel.is_none()
92            && other.channel.is_none()
93    }
94}
95
96/// A peer session. Each connected peer will have one session.
97#[derive(Debug, Clone)]
98pub struct Session {
99    /// Peer id.
100    pub id: NodeId,
101    /// Peer address.
102    pub addr: Address,
103    /// Connection direction.
104    pub link: Link,
105    /// Whether we should attempt to re-connect
106    /// to this peer upon disconnection.
107    pub persistent: bool,
108    /// Peer connection state.
109    pub state: State,
110    /// Peer subscription.
111    pub subscribe: Option<message::Subscribe>,
112    /// Last time a message was received from the peer.
113    pub last_active: LocalTime,
114    /// Fetch queue.
115    pub queue: VecDeque<QueuedFetch>,
116
117    /// Connection attempts. For persistent peers, Tracks
118    /// how many times we've attempted to connect. We reset this to zero
119    /// upon successful connection, once the connection is stable.
120    attempts: usize,
121    /// Source of entropy.
122    rng: Rng,
123    /// Protocol limits.
124    limits: Limits,
125}
126
127impl fmt::Display for Session {
128    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
129        let mut attrs = Vec::new();
130        let state = self.state.to_string();
131
132        if self.link.is_inbound() {
133            attrs.push("inbound");
134        } else {
135            attrs.push("outbound");
136        }
137        if self.persistent {
138            attrs.push("persistent");
139        }
140        attrs.push(state.as_str());
141
142        write!(f, "{} [{}]", self.id, attrs.join(" "))
143    }
144}
145
146impl From<&Session> for radicle::node::Session {
147    fn from(s: &Session) -> Self {
148        Self {
149            nid: s.id,
150            link: if s.link.is_inbound() {
151                radicle::node::Link::Inbound
152            } else {
153                radicle::node::Link::Outbound
154            },
155            addr: s.addr.clone(),
156            state: s.state.clone(),
157        }
158    }
159}
160
161impl Session {
162    pub fn outbound(id: NodeId, addr: Address, persistent: bool, rng: Rng, limits: Limits) -> Self {
163        Self {
164            id,
165            addr,
166            state: State::Initial,
167            link: Link::Outbound,
168            subscribe: None,
169            persistent,
170            last_active: LocalTime::default(),
171            queue: VecDeque::with_capacity(MAX_FETCH_QUEUE_SIZE),
172            attempts: 1,
173            rng,
174            limits,
175        }
176    }
177
178    pub fn inbound(
179        id: NodeId,
180        addr: Address,
181        persistent: bool,
182        rng: Rng,
183        time: LocalTime,
184        limits: Limits,
185    ) -> Self {
186        Self {
187            id,
188            addr,
189            state: State::Connected {
190                since: time,
191                ping: PingState::default(),
192                fetching: HashSet::default(),
193                latencies: VecDeque::default(),
194                stable: false,
195            },
196            link: Link::Inbound,
197            subscribe: None,
198            persistent,
199            last_active: time,
200            queue: VecDeque::new(),
201            attempts: 0,
202            rng,
203            limits,
204        }
205    }
206
207    pub fn is_connecting(&self) -> bool {
208        matches!(self.state, State::Attempted)
209    }
210
211    pub fn is_stable(&self) -> bool {
212        matches!(self.state, State::Connected { stable: true, .. })
213    }
214
215    pub fn is_connected(&self) -> bool {
216        self.state.is_connected()
217    }
218
219    pub fn is_disconnected(&self) -> bool {
220        matches!(self.state, State::Disconnected { .. })
221    }
222
223    pub fn is_initial(&self) -> bool {
224        matches!(self.state, State::Initial)
225    }
226
227    pub fn is_at_capacity(&self) -> bool {
228        if let State::Connected { fetching, .. } = &self.state {
229            if fetching.len() >= self.limits.fetch_concurrency.into() {
230                return true;
231            }
232        }
233        false
234    }
235
236    pub fn is_fetching(&self, rid: &RepoId) -> bool {
237        if let State::Connected { fetching, .. } = &self.state {
238            return fetching.contains(rid);
239        }
240        false
241    }
242
243    /// Queue a fetch. Returns `true` if it was added to the queue, and `false` if
244    /// it already was present in the queue.
245    pub fn queue_fetch(&mut self, fetch: QueuedFetch) -> Result<(), QueueError> {
246        assert_eq!(fetch.from, self.id);
247
248        if self.queue.len() >= MAX_FETCH_QUEUE_SIZE {
249            return Err(QueueError::CapacityReached(fetch));
250        } else if self.queue.contains(&fetch) {
251            return Err(QueueError::Duplicate(fetch));
252        }
253        self.queue.push_back(fetch);
254
255        Ok(())
256    }
257
258    pub fn dequeue_fetch(&mut self) -> Option<QueuedFetch> {
259        self.queue.pop_front()
260    }
261
262    pub fn attempts(&self) -> usize {
263        self.attempts
264    }
265
266    /// Run 'idle' task for session.
267    pub fn idle(&mut self, now: LocalTime) {
268        if let State::Connected {
269            since,
270            ref mut stable,
271            ..
272        } = self.state
273        {
274            if now >= since && now.duration_since(since) >= CONNECTION_STABLE_THRESHOLD {
275                *stable = true;
276                // Reset number of attempts for stable connections.
277                self.attempts = 0;
278            }
279        }
280    }
281
282    /// Mark this session as fetching the given RID.
283    ///
284    /// # Panics
285    ///
286    /// If it is already fetching that RID, or the session is disconnected.
287    pub fn fetching(&mut self, rid: RepoId) {
288        if let State::Connected { fetching, .. } = &mut self.state {
289            assert!(
290                fetching.insert(rid),
291                "Session must not already be fetching {rid}"
292            );
293        } else {
294            panic!(
295                "Attempting to fetch {rid} from disconnected session {}",
296                self.id
297            );
298        }
299    }
300
301    pub fn fetched(&mut self, rid: RepoId) {
302        if let State::Connected { fetching, .. } = &mut self.state {
303            if !fetching.remove(&rid) {
304                log::warn!(target: "service", "Fetched unknown repository {rid}");
305            }
306        }
307    }
308
309    pub fn to_attempted(&mut self) {
310        assert!(
311            self.is_initial(),
312            "Can only transition to 'attempted' state from 'initial' state"
313        );
314        self.state = State::Attempted;
315        self.attempts += 1;
316    }
317
318    pub fn to_connected(&mut self, since: LocalTime) {
319        self.last_active = since;
320
321        if let State::Connected { .. } = &self.state {
322            log::error!(target: "service", "Session {} is already in 'connected' state, resetting..", self.id);
323        };
324        self.state = State::Connected {
325            since,
326            ping: PingState::default(),
327            fetching: HashSet::default(),
328            latencies: VecDeque::default(),
329            stable: false,
330        };
331    }
332
333    /// Move the session state to "disconnected". Returns any pending RID
334    /// that was requested.
335    pub fn to_disconnected(&mut self, since: LocalTime, retry_at: LocalTime) {
336        self.state = State::Disconnected { since, retry_at };
337    }
338
339    /// Return to initial state from disconnected state. This state transition
340    /// happens when we attempt to re-connect to a disconnected peer.
341    pub fn to_initial(&mut self) {
342        assert!(
343            self.is_disconnected(),
344            "Can only transition to 'initial' state from 'disconnected' state"
345        );
346        self.state = State::Initial;
347    }
348
349    pub fn ping(&mut self, since: LocalTime, reactor: &mut Outbox) -> Result<(), Error> {
350        if let State::Connected { ping, .. } = &mut self.state {
351            let msg = message::Ping::new(&mut self.rng);
352            *ping = PingState::AwaitingResponse {
353                len: msg.ponglen,
354                since,
355            };
356            reactor.write(self, Message::Ping(msg));
357        }
358        Ok(())
359    }
360}