1use std::{
4 future::Future,
5 time::{Duration, Instant},
6};
7
8use iroh::{Endpoint, NodeAddr, PublicKey};
9use serde::{Deserialize, Serialize};
10use tracing::{debug, error_span, trace, Instrument};
11
12use crate::{
13 actor::SyncHandle,
14 metrics::Metrics,
15 net::codec::{run_alice, BobState},
16 NamespaceId, SyncOutcome,
17};
18
19pub const ALPN: &[u8] = b"/iroh-sync/1";
21
22mod codec;
23
24pub async fn connect_and_sync(
26 endpoint: &Endpoint,
27 sync: &SyncHandle,
28 namespace: NamespaceId,
29 peer: NodeAddr,
30 metrics: Option<&Metrics>,
31) -> Result<SyncFinished, ConnectError> {
32 let t_start = Instant::now();
33 let peer_id = peer.node_id;
34 trace!("connect");
35 let connection = endpoint
36 .connect(peer, crate::ALPN)
37 .await
38 .map_err(ConnectError::connect)?;
39
40 let (mut send_stream, mut recv_stream) =
41 connection.open_bi().await.map_err(ConnectError::connect)?;
42
43 let t_connect = t_start.elapsed();
44 debug!(?t_connect, "connected");
45
46 let res = run_alice(&mut send_stream, &mut recv_stream, sync, namespace, peer_id).await;
47
48 send_stream.finish().map_err(ConnectError::close)?;
49 send_stream.stopped().await.map_err(ConnectError::close)?;
50 recv_stream
51 .read_to_end(0)
52 .await
53 .map_err(ConnectError::close)?;
54
55 if let Some(metrics) = metrics {
56 if res.is_ok() {
57 metrics.sync_via_connect_success.inc();
58 } else {
59 metrics.sync_via_connect_failure.inc();
60 }
61 }
62
63 let t_process = t_start.elapsed() - t_connect;
64 match &res {
65 Ok(res) => {
66 debug!(
67 ?t_connect,
68 ?t_process,
69 sent = %res.num_sent,
70 recv = %res.num_recv,
71 "done, ok"
72 );
73 }
74 Err(err) => {
75 debug!(?t_connect, ?t_process, ?err, "done, failed");
76 }
77 }
78
79 let outcome = res?;
80
81 let timings = Timings {
82 connect: t_connect,
83 process: t_process,
84 };
85
86 let res = SyncFinished {
87 namespace,
88 peer: peer_id,
89 outcome,
90 timings,
91 };
92
93 Ok(res)
94}
95
96#[derive(Debug, Clone)]
98pub enum AcceptOutcome {
99 Allow,
101 Reject(AbortReason),
103}
104
105pub async fn handle_connection<F, Fut>(
107 sync: SyncHandle,
108 connection: iroh::endpoint::Connection,
109 accept_cb: F,
110 metrics: Option<&Metrics>,
111) -> Result<SyncFinished, AcceptError>
112where
113 F: Fn(NamespaceId, PublicKey) -> Fut,
114 Fut: Future<Output = AcceptOutcome>,
115{
116 let t_start = Instant::now();
117 let peer = connection.remote_node_id().map_err(AcceptError::connect)?;
118 let (mut send_stream, mut recv_stream) = connection
119 .accept_bi()
120 .await
121 .map_err(|e| AcceptError::open(peer, e))?;
122
123 let t_connect = t_start.elapsed();
124 let span = error_span!("accept", peer = %peer.fmt_short(), namespace = tracing::field::Empty);
125 span.in_scope(|| {
126 debug!(?t_connect, "connection established");
127 });
128
129 let mut state = BobState::new(peer);
130 let res = state
131 .run(&mut send_stream, &mut recv_stream, sync, accept_cb)
132 .instrument(span.clone())
133 .await;
134
135 if let Some(metrics) = metrics {
136 if res.is_ok() {
137 metrics.sync_via_accept_success.inc();
138 } else {
139 metrics.sync_via_accept_failure.inc();
140 }
141 }
142
143 let namespace = state.namespace();
144 let outcome = state.into_outcome();
145
146 send_stream
147 .finish()
148 .map_err(|error| AcceptError::close(peer, namespace, error))?;
149 send_stream
150 .stopped()
151 .await
152 .map_err(|error| AcceptError::close(peer, namespace, error))?;
153 recv_stream
154 .read_to_end(0)
155 .await
156 .map_err(|error| AcceptError::close(peer, namespace, error))?;
157
158 let t_process = t_start.elapsed() - t_connect;
159 span.in_scope(|| match &res {
160 Ok(_res) => {
161 debug!(
162 ?t_connect,
163 ?t_process,
164 sent = %outcome.num_sent,
165 recv = %outcome.num_recv,
166 "done, ok"
167 );
168 }
169 Err(err) => {
170 debug!(?t_connect, ?t_process, ?err, "done, failed");
171 }
172 });
173
174 let namespace = res?;
175
176 let timings = Timings {
177 connect: t_connect,
178 process: t_process,
179 };
180 let res = SyncFinished {
181 namespace,
182 outcome,
183 peer,
184 timings,
185 };
186
187 Ok(res)
188}
189
190#[derive(Debug, Clone)]
192pub struct SyncFinished {
193 pub namespace: NamespaceId,
195 pub peer: PublicKey,
197 pub outcome: SyncOutcome,
199 pub timings: Timings,
201}
202
203#[derive(Debug, Default, Clone)]
205pub struct Timings {
206 pub connect: Duration,
208 pub process: Duration,
210}
211
212#[derive(thiserror::Error, Debug)]
214#[allow(missing_docs)]
215pub enum AcceptError {
216 #[error("Failed to establish connection")]
218 Connect {
219 #[source]
220 error: anyhow::Error,
221 },
222 #[error("Failed to open replica with {peer:?}")]
224 Open {
225 peer: PublicKey,
226 #[source]
227 error: anyhow::Error,
228 },
229 #[error("Aborted sync of {namespace:?} with {peer:?}: {reason:?}")]
231 Abort {
232 peer: PublicKey,
233 namespace: NamespaceId,
234 reason: AbortReason,
235 },
236 #[error("Failed to sync {namespace:?} with {peer:?}")]
238 Sync {
239 peer: PublicKey,
240 namespace: Option<NamespaceId>,
241 #[source]
242 error: anyhow::Error,
243 },
244 #[error("Failed to close {namespace:?} with {peer:?}")]
246 Close {
247 peer: PublicKey,
248 namespace: Option<NamespaceId>,
249 #[source]
250 error: anyhow::Error,
251 },
252}
253
254#[derive(thiserror::Error, Debug)]
256#[allow(missing_docs)]
257pub enum ConnectError {
258 #[error("Failed to establish connection")]
260 Connect {
261 #[source]
262 error: anyhow::Error,
263 },
264 #[error("Remote peer aborted sync: {0:?}")]
266 RemoteAbort(AbortReason),
267 #[error("Failed to sync")]
269 Sync {
270 #[source]
271 error: anyhow::Error,
272 },
273 #[error("Failed to close connection1")]
275 Close {
276 #[source]
277 error: anyhow::Error,
278 },
279}
280
281#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
283pub enum AbortReason {
284 NotFound,
286 AlreadySyncing,
288 InternalServerError,
290}
291
292impl AcceptError {
293 fn connect(error: impl Into<anyhow::Error>) -> Self {
294 Self::Connect {
295 error: error.into(),
296 }
297 }
298 fn open(peer: PublicKey, error: impl Into<anyhow::Error>) -> Self {
299 Self::Open {
300 peer,
301 error: error.into(),
302 }
303 }
304 pub(crate) fn sync(
305 peer: PublicKey,
306 namespace: Option<NamespaceId>,
307 error: impl Into<anyhow::Error>,
308 ) -> Self {
309 Self::Sync {
310 peer,
311 namespace,
312 error: error.into(),
313 }
314 }
315 fn close(
316 peer: PublicKey,
317 namespace: Option<NamespaceId>,
318 error: impl Into<anyhow::Error>,
319 ) -> Self {
320 Self::Close {
321 peer,
322 namespace,
323 error: error.into(),
324 }
325 }
326 pub fn peer(&self) -> Option<PublicKey> {
328 match self {
329 AcceptError::Connect { .. } => None,
330 AcceptError::Open { peer, .. } => Some(*peer),
331 AcceptError::Sync { peer, .. } => Some(*peer),
332 AcceptError::Close { peer, .. } => Some(*peer),
333 AcceptError::Abort { peer, .. } => Some(*peer),
334 }
335 }
336
337 pub fn namespace(&self) -> Option<NamespaceId> {
339 match self {
340 AcceptError::Connect { .. } => None,
341 AcceptError::Open { .. } => None,
342 AcceptError::Sync { namespace, .. } => namespace.to_owned(),
343 AcceptError::Close { namespace, .. } => namespace.to_owned(),
344 AcceptError::Abort { namespace, .. } => Some(*namespace),
345 }
346 }
347}
348
349impl ConnectError {
350 fn connect(error: impl Into<anyhow::Error>) -> Self {
351 Self::Connect {
352 error: error.into(),
353 }
354 }
355 fn close(error: impl Into<anyhow::Error>) -> Self {
356 Self::Close {
357 error: error.into(),
358 }
359 }
360 pub(crate) fn sync(error: impl Into<anyhow::Error>) -> Self {
361 Self::Sync {
362 error: error.into(),
363 }
364 }
365 pub(crate) fn remote_abort(reason: AbortReason) -> Self {
366 Self::RemoteAbort(reason)
367 }
368}