radicle_protocol/service/
session.rs1use std::collections::{HashSet, VecDeque};
2use std::{fmt, time};
3
4use crossbeam_channel as chan;
5use radicle::node::config::Limits;
6use radicle::node::{FetchResult, Severity};
7use radicle::node::{Link, Timestamp};
8pub use radicle::node::{PingState, State};
9use radicle::storage::refs::RefsAt;
10
11use crate::service::message;
12use crate::service::message::Message;
13use crate::service::{Address, LocalDuration, LocalTime, NodeId, Outbox, RepoId, Rng};
14
15pub const CONNECTION_STABLE_THRESHOLD: LocalDuration = LocalDuration::from_mins(1);
17pub const MAX_FETCH_QUEUE_SIZE: usize = 128;
19
20#[derive(thiserror::Error, Debug, Clone, Copy)]
21pub enum Error {
22 #[error("invalid announcement timestamp: {0}")]
25 InvalidTimestamp(Timestamp),
26 #[error("protocol mismatch")]
29 ProtocolMismatch,
30 #[error("peer misbehaved")]
32 Misbehavior,
33 #[error("peer timed out")]
35 Timeout,
36}
37
38impl Error {
39 pub fn severity(&self) -> Severity {
41 match self {
42 Self::InvalidTimestamp(_) => Severity::High,
43 Self::ProtocolMismatch => Severity::High,
44 Self::Misbehavior => Severity::High,
45 Self::Timeout => Severity::Low,
46 }
47 }
48}
49
50#[derive(thiserror::Error, Debug, Clone)]
52pub enum QueueError {
53 #[error("item is already queued")]
55 Duplicate(QueuedFetch),
56 #[error("queue capacity reached")]
58 CapacityReached(QueuedFetch),
59}
60
61impl QueueError {
62 pub fn inner(&self) -> &QueuedFetch {
64 match self {
65 Self::Duplicate(f) => f,
66 Self::CapacityReached(f) => f,
67 }
68 }
69}
70
71#[derive(Debug, Clone)]
73pub struct QueuedFetch {
74 pub rid: RepoId,
76 pub from: NodeId,
78 pub refs_at: Vec<RefsAt>,
80 pub timeout: time::Duration,
82 pub channel: Option<chan::Sender<FetchResult>>,
84}
85
86impl PartialEq for QueuedFetch {
87 fn eq(&self, other: &Self) -> bool {
88 self.rid == other.rid
89 && self.from == other.from
90 && self.refs_at == other.refs_at
91 && self.channel.is_none()
92 && other.channel.is_none()
93 }
94}
95
96#[derive(Debug, Clone)]
98pub struct Session {
99 pub id: NodeId,
101 pub addr: Address,
103 pub link: Link,
105 pub persistent: bool,
108 pub state: State,
110 pub subscribe: Option<message::Subscribe>,
112 pub last_active: LocalTime,
114 pub queue: VecDeque<QueuedFetch>,
116
117 attempts: usize,
121 rng: Rng,
123 limits: Limits,
125}
126
127impl fmt::Display for Session {
128 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
129 let mut attrs = Vec::new();
130 let state = self.state.to_string();
131
132 if self.link.is_inbound() {
133 attrs.push("inbound");
134 } else {
135 attrs.push("outbound");
136 }
137 if self.persistent {
138 attrs.push("persistent");
139 }
140 attrs.push(state.as_str());
141
142 write!(f, "{} [{}]", self.id, attrs.join(" "))
143 }
144}
145
146impl From<&Session> for radicle::node::Session {
147 fn from(s: &Session) -> Self {
148 Self {
149 nid: s.id,
150 link: if s.link.is_inbound() {
151 radicle::node::Link::Inbound
152 } else {
153 radicle::node::Link::Outbound
154 },
155 addr: s.addr.clone(),
156 state: s.state.clone(),
157 }
158 }
159}
160
161impl Session {
162 pub fn outbound(id: NodeId, addr: Address, persistent: bool, rng: Rng, limits: Limits) -> Self {
163 Self {
164 id,
165 addr,
166 state: State::Initial,
167 link: Link::Outbound,
168 subscribe: None,
169 persistent,
170 last_active: LocalTime::default(),
171 queue: VecDeque::with_capacity(MAX_FETCH_QUEUE_SIZE),
172 attempts: 1,
173 rng,
174 limits,
175 }
176 }
177
178 pub fn inbound(
179 id: NodeId,
180 addr: Address,
181 persistent: bool,
182 rng: Rng,
183 time: LocalTime,
184 limits: Limits,
185 ) -> Self {
186 Self {
187 id,
188 addr,
189 state: State::Connected {
190 since: time,
191 ping: PingState::default(),
192 fetching: HashSet::default(),
193 latencies: VecDeque::default(),
194 stable: false,
195 },
196 link: Link::Inbound,
197 subscribe: None,
198 persistent,
199 last_active: time,
200 queue: VecDeque::new(),
201 attempts: 0,
202 rng,
203 limits,
204 }
205 }
206
207 pub fn is_connecting(&self) -> bool {
208 matches!(self.state, State::Attempted)
209 }
210
211 pub fn is_stable(&self) -> bool {
212 matches!(self.state, State::Connected { stable: true, .. })
213 }
214
215 pub fn is_connected(&self) -> bool {
216 self.state.is_connected()
217 }
218
219 pub fn is_disconnected(&self) -> bool {
220 matches!(self.state, State::Disconnected { .. })
221 }
222
223 pub fn is_initial(&self) -> bool {
224 matches!(self.state, State::Initial)
225 }
226
227 pub fn is_at_capacity(&self) -> bool {
228 if let State::Connected { fetching, .. } = &self.state {
229 if fetching.len() >= self.limits.fetch_concurrency.into() {
230 return true;
231 }
232 }
233 false
234 }
235
236 pub fn is_fetching(&self, rid: &RepoId) -> bool {
237 if let State::Connected { fetching, .. } = &self.state {
238 return fetching.contains(rid);
239 }
240 false
241 }
242
243 pub fn queue_fetch(&mut self, fetch: QueuedFetch) -> Result<(), QueueError> {
246 assert_eq!(fetch.from, self.id);
247
248 if self.queue.len() >= MAX_FETCH_QUEUE_SIZE {
249 return Err(QueueError::CapacityReached(fetch));
250 } else if self.queue.contains(&fetch) {
251 return Err(QueueError::Duplicate(fetch));
252 }
253 self.queue.push_back(fetch);
254
255 Ok(())
256 }
257
258 pub fn dequeue_fetch(&mut self) -> Option<QueuedFetch> {
259 self.queue.pop_front()
260 }
261
262 pub fn attempts(&self) -> usize {
263 self.attempts
264 }
265
266 pub fn idle(&mut self, now: LocalTime) {
268 if let State::Connected {
269 since,
270 ref mut stable,
271 ..
272 } = self.state
273 {
274 if now >= since && now.duration_since(since) >= CONNECTION_STABLE_THRESHOLD {
275 *stable = true;
276 self.attempts = 0;
278 }
279 }
280 }
281
282 pub fn fetching(&mut self, rid: RepoId) {
288 if let State::Connected { fetching, .. } = &mut self.state {
289 assert!(
290 fetching.insert(rid),
291 "Session must not already be fetching {rid}"
292 );
293 } else {
294 panic!(
295 "Attempting to fetch {rid} from disconnected session {}",
296 self.id
297 );
298 }
299 }
300
301 pub fn fetched(&mut self, rid: RepoId) {
302 if let State::Connected { fetching, .. } = &mut self.state {
303 if !fetching.remove(&rid) {
304 log::warn!(target: "service", "Fetched unknown repository {rid}");
305 }
306 }
307 }
308
309 pub fn to_attempted(&mut self) {
310 assert!(
311 self.is_initial(),
312 "Can only transition to 'attempted' state from 'initial' state"
313 );
314 self.state = State::Attempted;
315 self.attempts += 1;
316 }
317
318 pub fn to_connected(&mut self, since: LocalTime) {
319 self.last_active = since;
320
321 if let State::Connected { .. } = &self.state {
322 log::error!(target: "service", "Session {} is already in 'connected' state, resetting..", self.id);
323 };
324 self.state = State::Connected {
325 since,
326 ping: PingState::default(),
327 fetching: HashSet::default(),
328 latencies: VecDeque::default(),
329 stable: false,
330 };
331 }
332
333 pub fn to_disconnected(&mut self, since: LocalTime, retry_at: LocalTime) {
336 self.state = State::Disconnected { since, retry_at };
337 }
338
339 pub fn to_initial(&mut self) {
342 assert!(
343 self.is_disconnected(),
344 "Can only transition to 'initial' state from 'disconnected' state"
345 );
346 self.state = State::Initial;
347 }
348
349 pub fn ping(&mut self, since: LocalTime, reactor: &mut Outbox) -> Result<(), Error> {
350 if let State::Connected { ping, .. } = &mut self.state {
351 let msg = message::Ping::new(&mut self.rng);
352 *ping = PingState::AwaitingResponse {
353 len: msg.ponglen,
354 since,
355 };
356 reactor.write(self, Message::Ping(msg));
357 }
358 Ok(())
359 }
360}