radicle_protocol/service/
session.rs1use std::collections::VecDeque;
2use std::{fmt, time};
3
4use crossbeam_channel as chan;
5use radicle::node::{FetchResult, Severity};
6use radicle::node::{Link, Timestamp};
7pub use radicle::node::{PingState, State};
8use radicle::storage::refs::RefsAt;
9
10use crate::service::message;
11use crate::service::message::Message;
12use crate::service::{Address, LocalDuration, LocalTime, NodeId, Outbox, RepoId, Rng};
13
14pub const CONNECTION_STABLE_THRESHOLD: LocalDuration = LocalDuration::from_mins(1);
16pub const MAX_FETCH_QUEUE_SIZE: usize = 128;
18
19#[derive(thiserror::Error, Debug, Clone, Copy)]
20pub enum Error {
21 #[error("invalid announcement timestamp: {0}")]
24 InvalidTimestamp(Timestamp),
25 #[error("protocol mismatch")]
28 ProtocolMismatch,
29 #[error("peer misbehaved")]
31 Misbehavior,
32 #[error("peer timed out")]
34 Timeout,
35}
36
37impl Error {
38 pub fn severity(&self) -> Severity {
40 match self {
41 Self::InvalidTimestamp(_) => Severity::High,
42 Self::ProtocolMismatch => Severity::High,
43 Self::Misbehavior => Severity::High,
44 Self::Timeout => Severity::Low,
45 }
46 }
47}
48
49#[derive(thiserror::Error, Debug, Clone)]
51pub enum QueueError {
52 #[error("item is already queued")]
54 Duplicate(QueuedFetch),
55 #[error("queue capacity reached")]
57 CapacityReached(QueuedFetch),
58}
59
60impl QueueError {
61 pub fn inner(&self) -> &QueuedFetch {
63 match self {
64 Self::Duplicate(f) => f,
65 Self::CapacityReached(f) => f,
66 }
67 }
68}
69
70#[derive(Debug, Clone)]
72pub struct QueuedFetch {
73 pub rid: RepoId,
75 pub from: NodeId,
77 pub refs_at: Vec<RefsAt>,
79 pub timeout: time::Duration,
81 pub channel: Option<chan::Sender<FetchResult>>,
83}
84
85impl PartialEq for QueuedFetch {
86 fn eq(&self, other: &Self) -> bool {
87 self.rid == other.rid
88 && self.from == other.from
89 && self.refs_at == other.refs_at
90 && self.channel.is_none()
91 && other.channel.is_none()
92 }
93}
94
95#[derive(Debug, Clone)]
97pub struct Session {
98 pub id: NodeId,
100 pub addr: Address,
102 pub link: Link,
104 pub persistent: bool,
107 pub state: State,
109 pub subscribe: Option<message::Subscribe>,
111 pub last_active: LocalTime,
113
114 attempts: usize,
118 rng: Rng,
120}
121
122impl fmt::Display for Session {
123 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124 let mut attrs = Vec::new();
125 let state = self.state.to_string();
126
127 if self.link.is_inbound() {
128 attrs.push("inbound");
129 } else {
130 attrs.push("outbound");
131 }
132 if self.persistent {
133 attrs.push("persistent");
134 }
135 attrs.push(state.as_str());
136
137 write!(f, "{} [{}]", self.id, attrs.join(" "))
138 }
139}
140
141impl From<&Session> for radicle::node::Session {
142 fn from(s: &Session) -> Self {
143 Self {
144 nid: s.id,
145 link: if s.link.is_inbound() {
146 radicle::node::Link::Inbound
147 } else {
148 radicle::node::Link::Outbound
149 },
150 addr: s.addr.clone(),
151 state: s.state.clone(),
152 }
153 }
154}
155
156impl Session {
157 pub fn outbound(id: NodeId, addr: Address, persistent: bool, rng: Rng) -> Self {
158 Self {
159 id,
160 addr,
161 state: State::Initial,
162 link: Link::Outbound,
163 subscribe: None,
164 persistent,
165 last_active: LocalTime::default(),
166 attempts: 1,
167 rng,
168 }
169 }
170
171 pub fn inbound(id: NodeId, addr: Address, persistent: bool, rng: Rng, time: LocalTime) -> Self {
172 Self {
173 id,
174 addr,
175 state: State::Connected {
176 since: time,
177 ping: PingState::default(),
178 latencies: VecDeque::default(),
179 stable: false,
180 },
181 link: Link::Inbound,
182 subscribe: None,
183 persistent,
184 last_active: time,
185 attempts: 0,
186 rng,
187 }
188 }
189
190 pub fn is_connecting(&self) -> bool {
191 matches!(self.state, State::Attempted)
192 }
193
194 pub fn is_stable(&self) -> bool {
195 matches!(self.state, State::Connected { stable: true, .. })
196 }
197
198 pub fn is_connected(&self) -> bool {
199 self.state.is_connected()
200 }
201
202 pub fn is_disconnected(&self) -> bool {
203 matches!(self.state, State::Disconnected { .. })
204 }
205
206 pub fn is_initial(&self) -> bool {
207 matches!(self.state, State::Initial)
208 }
209
210 pub fn attempts(&self) -> usize {
211 self.attempts
212 }
213
214 pub fn idle(&mut self, now: LocalTime) {
216 if let State::Connected {
217 since,
218 ref mut stable,
219 ..
220 } = self.state
221 {
222 if now >= since && now.duration_since(since) >= CONNECTION_STABLE_THRESHOLD {
223 *stable = true;
224 self.attempts = 0;
226 }
227 }
228 }
229
230 pub fn to_attempted(&mut self) {
231 assert!(
232 self.is_initial(),
233 "Can only transition to 'attempted' state from 'initial' state"
234 );
235 self.state = State::Attempted;
236 self.attempts += 1;
237 }
238
239 pub fn to_connected(&mut self, since: LocalTime) {
240 self.last_active = since;
241
242 if let State::Connected { .. } = &self.state {
243 log::debug!(target: "service", "Session {} is already in 'connected' state, resetting..", self.id);
244 };
245 self.state = State::Connected {
246 since,
247 ping: PingState::default(),
248 latencies: VecDeque::default(),
249 stable: false,
250 };
251 }
252
253 pub fn to_disconnected(&mut self, since: LocalTime, retry_at: LocalTime) {
256 self.state = State::Disconnected { since, retry_at };
257 }
258
259 pub fn to_initial(&mut self) {
262 assert!(
263 self.is_disconnected(),
264 "Can only transition to 'initial' state from 'disconnected' state"
265 );
266 self.state = State::Initial;
267 }
268
269 pub fn ping(&mut self, since: LocalTime, reactor: &mut Outbox) -> Result<(), Error> {
270 if let State::Connected { ping, .. } = &mut self.state {
271 let msg = message::Ping::new(&mut self.rng);
272 *ping = PingState::AwaitingResponse {
273 len: msg.ponglen,
274 since,
275 };
276 reactor.write(self, Message::Ping(msg));
277 }
278 Ok(())
279 }
280}