1use std::{
4 future::Future,
5 time::{Duration, Instant},
6};
7
8use iroh_net::{key::PublicKey, magic_endpoint::get_remote_node_id, MagicEndpoint, NodeAddr};
9use serde::{Deserialize, Serialize};
10use tracing::{debug, error_span, trace, Instrument};
11
12use crate::{
13 actor::SyncHandle,
14 net::codec::{run_alice, BobState},
15 NamespaceId, SyncOutcome,
16};
17
18#[cfg(feature = "metrics")]
19use crate::metrics::Metrics;
20#[cfg(feature = "metrics")]
21use iroh_metrics::inc;
22
23pub const SYNC_ALPN: &[u8] = b"/iroh-sync/1";
25
26mod codec;
27
28pub async fn connect_and_sync(
30 endpoint: &MagicEndpoint,
31 sync: &SyncHandle,
32 namespace: NamespaceId,
33 peer: NodeAddr,
34) -> Result<SyncFinished, ConnectError> {
35 let t_start = Instant::now();
36 let peer_id = peer.node_id;
37 trace!("connect");
38 let connection = endpoint
39 .connect(peer, SYNC_ALPN)
40 .await
41 .map_err(ConnectError::connect)?;
42
43 let (mut send_stream, mut recv_stream) =
44 connection.open_bi().await.map_err(ConnectError::connect)?;
45
46 let t_connect = t_start.elapsed();
47 debug!(?t_connect, "connected");
48
49 let res = run_alice(&mut send_stream, &mut recv_stream, sync, namespace, peer_id).await;
50
51 send_stream.finish().await.map_err(ConnectError::close)?;
52 recv_stream
53 .read_to_end(0)
54 .await
55 .map_err(ConnectError::close)?;
56
57 #[cfg(feature = "metrics")]
58 if res.is_ok() {
59 inc!(Metrics, sync_via_connect_success);
60 } else {
61 inc!(Metrics, sync_via_connect_failure);
62 }
63
64 let t_process = t_start.elapsed() - t_connect;
65 match &res {
66 Ok(res) => {
67 debug!(
68 ?t_connect,
69 ?t_process,
70 sent = %res.num_sent,
71 recv = %res.num_recv,
72 "done, ok"
73 );
74 }
75 Err(err) => {
76 debug!(?t_connect, ?t_process, ?err, "done, failed");
77 }
78 }
79
80 let outcome = res?;
81
82 let timings = Timings {
83 connect: t_connect,
84 process: t_process,
85 };
86
87 let res = SyncFinished {
88 namespace,
89 peer: peer_id,
90 outcome,
91 timings,
92 };
93
94 Ok(res)
95}
96
97#[derive(Debug, Clone)]
99pub enum AcceptOutcome {
100 Allow,
102 Reject(AbortReason),
104}
105
106pub async fn handle_connection<F, Fut>(
108 sync: SyncHandle,
109 connecting: quinn::Connecting,
110 accept_cb: F,
111) -> Result<SyncFinished, AcceptError>
112where
113 F: Fn(NamespaceId, PublicKey) -> Fut,
114 Fut: Future<Output = AcceptOutcome>,
115{
116 let t_start = Instant::now();
117 let connection = connecting.await.map_err(AcceptError::connect)?;
118 let peer = get_remote_node_id(&connection).map_err(AcceptError::connect)?;
119 let (mut send_stream, mut recv_stream) = connection
120 .accept_bi()
121 .await
122 .map_err(|e| AcceptError::open(peer, e))?;
123
124 let t_connect = t_start.elapsed();
125 let span = error_span!("accept", peer = %peer.fmt_short(), namespace = tracing::field::Empty);
126 span.in_scope(|| {
127 debug!(?t_connect, "connection established");
128 });
129
130 let mut state = BobState::new(peer);
131 let res = state
132 .run(&mut send_stream, &mut recv_stream, sync, accept_cb)
133 .instrument(span.clone())
134 .await;
135
136 #[cfg(feature = "metrics")]
137 if res.is_ok() {
138 inc!(Metrics, sync_via_accept_success);
139 } else {
140 inc!(Metrics, sync_via_accept_failure);
141 }
142
143 let namespace = state.namespace();
144 let outcome = state.into_outcome();
145
146 send_stream
147 .finish()
148 .await
149 .map_err(|error| AcceptError::close(peer, namespace, error))?;
150 recv_stream
151 .read_to_end(0)
152 .await
153 .map_err(|error| AcceptError::close(peer, namespace, error))?;
154
155 let t_process = t_start.elapsed() - t_connect;
156 span.in_scope(|| match &res {
157 Ok(_res) => {
158 debug!(
159 ?t_connect,
160 ?t_process,
161 sent = %outcome.num_sent,
162 recv = %outcome.num_recv,
163 "done, ok"
164 );
165 }
166 Err(err) => {
167 debug!(?t_connect, ?t_process, ?err, "done, failed");
168 }
169 });
170
171 let namespace = res?;
172
173 let timings = Timings {
174 connect: t_connect,
175 process: t_process,
176 };
177 let res = SyncFinished {
178 namespace,
179 outcome,
180 peer,
181 timings,
182 };
183
184 Ok(res)
185}
186
187#[derive(Debug, Clone)]
189pub struct SyncFinished {
190 pub namespace: NamespaceId,
192 pub peer: PublicKey,
194 pub outcome: SyncOutcome,
196 pub timings: Timings,
198}
199
200#[derive(Debug, Default, Clone)]
202pub struct Timings {
203 pub connect: Duration,
205 pub process: Duration,
207}
208
209#[derive(thiserror::Error, Debug)]
211#[allow(missing_docs)]
212pub enum AcceptError {
213 #[error("Failed to establish connection")]
215 Connect {
216 #[source]
217 error: anyhow::Error,
218 },
219 #[error("Failed to open replica with {peer:?}")]
221 Open {
222 peer: PublicKey,
223 #[source]
224 error: anyhow::Error,
225 },
226 #[error("Aborted sync of {namespace:?} with {peer:?}: {reason:?}")]
228 Abort {
229 peer: PublicKey,
230 namespace: NamespaceId,
231 reason: AbortReason,
232 },
233 #[error("Failed to sync {namespace:?} with {peer:?}")]
235 Sync {
236 peer: PublicKey,
237 namespace: Option<NamespaceId>,
238 #[source]
239 error: anyhow::Error,
240 },
241 #[error("Failed to close {namespace:?} with {peer:?}")]
243 Close {
244 peer: PublicKey,
245 namespace: Option<NamespaceId>,
246 #[source]
247 error: anyhow::Error,
248 },
249}
250
251#[derive(thiserror::Error, Debug)]
253#[allow(missing_docs)]
254pub enum ConnectError {
255 #[error("Failed to establish connection")]
257 Connect {
258 #[source]
259 error: anyhow::Error,
260 },
261 #[error("Remote peer aborted sync: {0:?}")]
263 RemoteAbort(AbortReason),
264 #[error("Failed to sync")]
266 Sync {
267 #[source]
268 error: anyhow::Error,
269 },
270 #[error("Failed to close connection1")]
272 Close {
273 #[source]
274 error: anyhow::Error,
275 },
276}
277
278#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
280pub enum AbortReason {
281 NotFound,
283 AlreadySyncing,
285 InternalServerError,
287}
288
289impl AcceptError {
290 fn connect(error: impl Into<anyhow::Error>) -> Self {
291 Self::Connect {
292 error: error.into(),
293 }
294 }
295 fn open(peer: PublicKey, error: impl Into<anyhow::Error>) -> Self {
296 Self::Open {
297 peer,
298 error: error.into(),
299 }
300 }
301 pub(crate) fn sync(
302 peer: PublicKey,
303 namespace: Option<NamespaceId>,
304 error: impl Into<anyhow::Error>,
305 ) -> Self {
306 Self::Sync {
307 peer,
308 namespace,
309 error: error.into(),
310 }
311 }
312 fn close(
313 peer: PublicKey,
314 namespace: Option<NamespaceId>,
315 error: impl Into<anyhow::Error>,
316 ) -> Self {
317 Self::Close {
318 peer,
319 namespace,
320 error: error.into(),
321 }
322 }
323 pub fn peer(&self) -> Option<PublicKey> {
325 match self {
326 AcceptError::Connect { .. } => None,
327 AcceptError::Open { peer, .. } => Some(*peer),
328 AcceptError::Sync { peer, .. } => Some(*peer),
329 AcceptError::Close { peer, .. } => Some(*peer),
330 AcceptError::Abort { peer, .. } => Some(*peer),
331 }
332 }
333
334 pub fn namespace(&self) -> Option<NamespaceId> {
336 match self {
337 AcceptError::Connect { .. } => None,
338 AcceptError::Open { .. } => None,
339 AcceptError::Sync { namespace, .. } => namespace.to_owned(),
340 AcceptError::Close { namespace, .. } => namespace.to_owned(),
341 AcceptError::Abort { namespace, .. } => Some(*namespace),
342 }
343 }
344}
345
346impl ConnectError {
347 fn connect(error: impl Into<anyhow::Error>) -> Self {
348 Self::Connect {
349 error: error.into(),
350 }
351 }
352 fn close(error: impl Into<anyhow::Error>) -> Self {
353 Self::Close {
354 error: error.into(),
355 }
356 }
357 pub(crate) fn sync(error: impl Into<anyhow::Error>) -> Self {
358 Self::Sync {
359 error: error.into(),
360 }
361 }
362 pub(crate) fn remote_abort(reason: AbortReason) -> Self {
363 Self::RemoteAbort(reason)
364 }
365}