1use std::collections::{HashSet, VecDeque};
2use std::{fmt, time};
3
4use crossbeam_channel as chan;
5
6use crate::node::config::Limits;
7use crate::node::{FetchResult, Severity};
8use crate::service::message;
9use crate::service::message::Message;
10use crate::service::{Address, LocalDuration, LocalTime, NodeId, Outbox, RepoId, Rng};
11use crate::storage::refs::RefsAt;
12use crate::{Link, Timestamp};
13
14pub use crate::node::{PingState, State};
15
16pub const CONNECTION_STABLE_THRESHOLD: LocalDuration = LocalDuration::from_mins(1);
18pub const MAX_FETCH_QUEUE_SIZE: usize = 128;
20
21#[derive(thiserror::Error, Debug, Clone, Copy)]
22pub enum Error {
23 #[error("invalid announcement timestamp: {0}")]
26 InvalidTimestamp(Timestamp),
27 #[error("protocol mismatch")]
30 ProtocolMismatch,
31 #[error("peer misbehaved")]
33 Misbehavior,
34 #[error("peer timed out")]
36 Timeout,
37}
38
39impl Error {
40 pub fn severity(&self) -> Severity {
42 match self {
43 Self::InvalidTimestamp(_) => Severity::High,
44 Self::ProtocolMismatch => Severity::High,
45 Self::Misbehavior => Severity::High,
46 Self::Timeout => Severity::Low,
47 }
48 }
49}
50
51#[derive(thiserror::Error, Debug, Clone)]
53pub enum QueueError {
54 #[error("item is already queued")]
56 Duplicate(QueuedFetch),
57 #[error("queue capacity reached")]
59 CapacityReached(QueuedFetch),
60}
61
62impl QueueError {
63 pub fn inner(&self) -> &QueuedFetch {
65 match self {
66 Self::Duplicate(f) => f,
67 Self::CapacityReached(f) => f,
68 }
69 }
70}
71
72#[derive(Debug, Clone)]
74pub struct QueuedFetch {
75 pub rid: RepoId,
77 pub from: NodeId,
79 pub refs_at: Vec<RefsAt>,
81 pub timeout: time::Duration,
83 pub channel: Option<chan::Sender<FetchResult>>,
85}
86
87impl PartialEq for QueuedFetch {
88 fn eq(&self, other: &Self) -> bool {
89 self.rid == other.rid
90 && self.from == other.from
91 && self.refs_at == other.refs_at
92 && self.channel.is_none()
93 && other.channel.is_none()
94 }
95}
96
97#[derive(Debug, Clone)]
99pub struct Session {
100 pub id: NodeId,
102 pub addr: Address,
104 pub link: Link,
106 pub persistent: bool,
109 pub state: State,
111 pub subscribe: Option<message::Subscribe>,
113 pub last_active: LocalTime,
115 pub queue: VecDeque<QueuedFetch>,
117
118 attempts: usize,
122 rng: Rng,
124 limits: Limits,
126}
127
128impl fmt::Display for Session {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 let mut attrs = Vec::new();
131 let state = self.state.to_string();
132
133 if self.link.is_inbound() {
134 attrs.push("inbound");
135 } else {
136 attrs.push("outbound");
137 }
138 if self.persistent {
139 attrs.push("persistent");
140 }
141 attrs.push(state.as_str());
142
143 write!(f, "{} [{}]", self.id, attrs.join(" "))
144 }
145}
146
147impl From<&Session> for radicle::node::Session {
148 fn from(s: &Session) -> Self {
149 Self {
150 nid: s.id,
151 link: if s.link.is_inbound() {
152 radicle::node::Link::Inbound
153 } else {
154 radicle::node::Link::Outbound
155 },
156 addr: s.addr.clone(),
157 state: s.state.clone(),
158 }
159 }
160}
161
162impl Session {
163 pub fn outbound(id: NodeId, addr: Address, persistent: bool, rng: Rng, limits: Limits) -> Self {
164 Self {
165 id,
166 addr,
167 state: State::Initial,
168 link: Link::Outbound,
169 subscribe: None,
170 persistent,
171 last_active: LocalTime::default(),
172 queue: VecDeque::with_capacity(MAX_FETCH_QUEUE_SIZE),
173 attempts: 1,
174 rng,
175 limits,
176 }
177 }
178
179 pub fn inbound(
180 id: NodeId,
181 addr: Address,
182 persistent: bool,
183 rng: Rng,
184 time: LocalTime,
185 limits: Limits,
186 ) -> Self {
187 Self {
188 id,
189 addr,
190 state: State::Connected {
191 since: time,
192 ping: PingState::default(),
193 fetching: HashSet::default(),
194 latencies: VecDeque::default(),
195 stable: false,
196 },
197 link: Link::Inbound,
198 subscribe: None,
199 persistent,
200 last_active: time,
201 queue: VecDeque::new(),
202 attempts: 0,
203 rng,
204 limits,
205 }
206 }
207
208 pub fn is_connecting(&self) -> bool {
209 matches!(self.state, State::Attempted { .. })
210 }
211
212 pub fn is_stable(&self) -> bool {
213 matches!(self.state, State::Connected { stable: true, .. })
214 }
215
216 pub fn is_connected(&self) -> bool {
217 self.state.is_connected()
218 }
219
220 pub fn is_disconnected(&self) -> bool {
221 matches!(self.state, State::Disconnected { .. })
222 }
223
224 pub fn is_initial(&self) -> bool {
225 matches!(self.state, State::Initial)
226 }
227
228 pub fn is_at_capacity(&self) -> bool {
229 if let State::Connected { fetching, .. } = &self.state {
230 if fetching.len() >= self.limits.fetch_concurrency {
231 return true;
232 }
233 }
234 false
235 }
236
237 pub fn is_fetching(&self, rid: &RepoId) -> bool {
238 if let State::Connected { fetching, .. } = &self.state {
239 return fetching.contains(rid);
240 }
241 false
242 }
243
244 pub fn queue_fetch(&mut self, fetch: QueuedFetch) -> Result<(), QueueError> {
247 assert_eq!(fetch.from, self.id);
248
249 if self.queue.len() >= MAX_FETCH_QUEUE_SIZE {
250 return Err(QueueError::CapacityReached(fetch));
251 } else if self.queue.contains(&fetch) {
252 return Err(QueueError::Duplicate(fetch));
253 }
254 self.queue.push_back(fetch);
255
256 Ok(())
257 }
258
259 pub fn dequeue_fetch(&mut self) -> Option<QueuedFetch> {
260 self.queue.pop_front()
261 }
262
263 pub fn attempts(&self) -> usize {
264 self.attempts
265 }
266
267 pub fn idle(&mut self, now: LocalTime) {
269 if let State::Connected {
270 since,
271 ref mut stable,
272 ..
273 } = self.state
274 {
275 if now >= since && now.duration_since(since) >= CONNECTION_STABLE_THRESHOLD {
276 *stable = true;
277 self.attempts = 0;
279 }
280 }
281 }
282
283 pub fn fetching(&mut self, rid: RepoId) {
289 if let State::Connected { fetching, .. } = &mut self.state {
290 assert!(
291 fetching.insert(rid),
292 "Session must not already be fetching {rid}"
293 );
294 } else {
295 panic!(
296 "Attempting to fetch {rid} from disconnected session {}",
297 self.id
298 );
299 }
300 }
301
302 pub fn fetched(&mut self, rid: RepoId) {
303 if let State::Connected { fetching, .. } = &mut self.state {
304 if !fetching.remove(&rid) {
305 log::warn!(target: "service", "Fetched unknown repository {rid}");
306 }
307 }
308 }
309
310 pub fn to_attempted(&mut self) {
311 assert!(
312 self.is_initial(),
313 "Can only transition to 'attempted' state from 'initial' state"
314 );
315 self.state = State::Attempted;
316 self.attempts += 1;
317 }
318
319 pub fn to_connected(&mut self, since: LocalTime) {
320 self.last_active = since;
321
322 if let State::Connected { .. } = &self.state {
323 log::error!(target: "service", "Session {} is already in 'connected' state, resetting..", self.id);
324 };
325 self.state = State::Connected {
326 since,
327 ping: PingState::default(),
328 fetching: HashSet::default(),
329 latencies: VecDeque::default(),
330 stable: false,
331 };
332 }
333
334 pub fn to_disconnected(&mut self, since: LocalTime, retry_at: LocalTime) {
337 self.state = State::Disconnected { since, retry_at };
338 }
339
340 pub fn to_initial(&mut self) {
343 assert!(
344 self.is_disconnected(),
345 "Can only transition to 'initial' state from 'disconnected' state"
346 );
347 self.state = State::Initial;
348 }
349
350 pub fn ping(&mut self, since: LocalTime, reactor: &mut Outbox) -> Result<(), Error> {
351 if let State::Connected { ping, .. } = &mut self.state {
352 let msg = message::Ping::new(&mut self.rng);
353 *ping = PingState::AwaitingResponse {
354 len: msg.ponglen,
355 since,
356 };
357 reactor.write(self, Message::Ping(msg));
358 }
359 Ok(())
360 }
361}