iroh_sync/
net.rs

1//! Network implementation of the iroh-sync protocol
2
3use std::{
4    future::Future,
5    time::{Duration, Instant},
6};
7
8use iroh_net::{key::PublicKey, magic_endpoint::get_remote_node_id, MagicEndpoint, NodeAddr};
9use serde::{Deserialize, Serialize};
10use tracing::{debug, error_span, trace, Instrument};
11
12use crate::{
13    actor::SyncHandle,
14    net::codec::{run_alice, BobState},
15    NamespaceId, SyncOutcome,
16};
17
18#[cfg(feature = "metrics")]
19use crate::metrics::Metrics;
20#[cfg(feature = "metrics")]
21use iroh_metrics::inc;
22
23/// The ALPN identifier for the iroh-sync protocol
24pub const SYNC_ALPN: &[u8] = b"/iroh-sync/1";
25
26mod codec;
27
28/// Connect to a peer and sync a replica
29pub async fn connect_and_sync(
30    endpoint: &MagicEndpoint,
31    sync: &SyncHandle,
32    namespace: NamespaceId,
33    peer: NodeAddr,
34) -> Result<SyncFinished, ConnectError> {
35    let t_start = Instant::now();
36    let peer_id = peer.node_id;
37    trace!("connect");
38    let connection = endpoint
39        .connect(peer, SYNC_ALPN)
40        .await
41        .map_err(ConnectError::connect)?;
42
43    let (mut send_stream, mut recv_stream) =
44        connection.open_bi().await.map_err(ConnectError::connect)?;
45
46    let t_connect = t_start.elapsed();
47    debug!(?t_connect, "connected");
48
49    let res = run_alice(&mut send_stream, &mut recv_stream, sync, namespace, peer_id).await;
50
51    send_stream.finish().await.map_err(ConnectError::close)?;
52    recv_stream
53        .read_to_end(0)
54        .await
55        .map_err(ConnectError::close)?;
56
57    #[cfg(feature = "metrics")]
58    if res.is_ok() {
59        inc!(Metrics, sync_via_connect_success);
60    } else {
61        inc!(Metrics, sync_via_connect_failure);
62    }
63
64    let t_process = t_start.elapsed() - t_connect;
65    match &res {
66        Ok(res) => {
67            debug!(
68                ?t_connect,
69                ?t_process,
70                sent = %res.num_sent,
71                recv = %res.num_recv,
72                "done, ok"
73            );
74        }
75        Err(err) => {
76            debug!(?t_connect, ?t_process, ?err, "done, failed");
77        }
78    }
79
80    let outcome = res?;
81
82    let timings = Timings {
83        connect: t_connect,
84        process: t_process,
85    };
86
87    let res = SyncFinished {
88        namespace,
89        peer: peer_id,
90        outcome,
91        timings,
92    };
93
94    Ok(res)
95}
96
97/// Whether we want to accept or reject an incoming sync request.
98#[derive(Debug, Clone)]
99pub enum AcceptOutcome {
100    /// Accept the sync request.
101    Allow,
102    /// Decline the sync request
103    Reject(AbortReason),
104}
105
106/// Handle an iroh-sync connection and sync all shared documents in the replica store.
107pub async fn handle_connection<F, Fut>(
108    sync: SyncHandle,
109    connecting: quinn::Connecting,
110    accept_cb: F,
111) -> Result<SyncFinished, AcceptError>
112where
113    F: Fn(NamespaceId, PublicKey) -> Fut,
114    Fut: Future<Output = AcceptOutcome>,
115{
116    let t_start = Instant::now();
117    let connection = connecting.await.map_err(AcceptError::connect)?;
118    let peer = get_remote_node_id(&connection).map_err(AcceptError::connect)?;
119    let (mut send_stream, mut recv_stream) = connection
120        .accept_bi()
121        .await
122        .map_err(|e| AcceptError::open(peer, e))?;
123
124    let t_connect = t_start.elapsed();
125    let span = error_span!("accept", peer = %peer.fmt_short(), namespace = tracing::field::Empty);
126    span.in_scope(|| {
127        debug!(?t_connect, "connection established");
128    });
129
130    let mut state = BobState::new(peer);
131    let res = state
132        .run(&mut send_stream, &mut recv_stream, sync, accept_cb)
133        .instrument(span.clone())
134        .await;
135
136    #[cfg(feature = "metrics")]
137    if res.is_ok() {
138        inc!(Metrics, sync_via_accept_success);
139    } else {
140        inc!(Metrics, sync_via_accept_failure);
141    }
142
143    let namespace = state.namespace();
144    let outcome = state.into_outcome();
145
146    send_stream
147        .finish()
148        .await
149        .map_err(|error| AcceptError::close(peer, namespace, error))?;
150    recv_stream
151        .read_to_end(0)
152        .await
153        .map_err(|error| AcceptError::close(peer, namespace, error))?;
154
155    let t_process = t_start.elapsed() - t_connect;
156    span.in_scope(|| match &res {
157        Ok(_res) => {
158            debug!(
159                ?t_connect,
160                ?t_process,
161                sent = %outcome.num_sent,
162                recv = %outcome.num_recv,
163                "done, ok"
164            );
165        }
166        Err(err) => {
167            debug!(?t_connect, ?t_process, ?err, "done, failed");
168        }
169    });
170
171    let namespace = res?;
172
173    let timings = Timings {
174        connect: t_connect,
175        process: t_process,
176    };
177    let res = SyncFinished {
178        namespace,
179        outcome,
180        peer,
181        timings,
182    };
183
184    Ok(res)
185}
186
187/// Details of a finished sync operation.
188#[derive(Debug, Clone)]
189pub struct SyncFinished {
190    /// The namespace that was synced.
191    pub namespace: NamespaceId,
192    /// The peer we syned with.
193    pub peer: PublicKey,
194    /// The outcome of the sync operation
195    pub outcome: SyncOutcome,
196    /// The time this operation took
197    pub timings: Timings,
198}
199
200/// Time a sync operation took
201#[derive(Debug, Default, Clone)]
202pub struct Timings {
203    /// Time to establish connection
204    pub connect: Duration,
205    /// Time to run sync exchange
206    pub process: Duration,
207}
208
209/// Errors that may occur on handling incoming sync connections.
210#[derive(thiserror::Error, Debug)]
211#[allow(missing_docs)]
212pub enum AcceptError {
213    /// Failed to establish connection
214    #[error("Failed to establish connection")]
215    Connect {
216        #[source]
217        error: anyhow::Error,
218    },
219    /// Failed to open replica
220    #[error("Failed to open replica with {peer:?}")]
221    Open {
222        peer: PublicKey,
223        #[source]
224        error: anyhow::Error,
225    },
226    /// We aborted the sync request.
227    #[error("Aborted sync of {namespace:?} with {peer:?}: {reason:?}")]
228    Abort {
229        peer: PublicKey,
230        namespace: NamespaceId,
231        reason: AbortReason,
232    },
233    /// Failed to run sync
234    #[error("Failed to sync {namespace:?} with {peer:?}")]
235    Sync {
236        peer: PublicKey,
237        namespace: Option<NamespaceId>,
238        #[source]
239        error: anyhow::Error,
240    },
241    /// Failed to close
242    #[error("Failed to close {namespace:?} with {peer:?}")]
243    Close {
244        peer: PublicKey,
245        namespace: Option<NamespaceId>,
246        #[source]
247        error: anyhow::Error,
248    },
249}
250
251/// Errors that may occur on outgoing sync requests.
252#[derive(thiserror::Error, Debug)]
253#[allow(missing_docs)]
254pub enum ConnectError {
255    /// Failed to establish connection
256    #[error("Failed to establish connection")]
257    Connect {
258        #[source]
259        error: anyhow::Error,
260    },
261    /// The remote peer aborted the sync request.
262    #[error("Remote peer aborted sync: {0:?}")]
263    RemoteAbort(AbortReason),
264    /// Failed to run sync
265    #[error("Failed to sync")]
266    Sync {
267        #[source]
268        error: anyhow::Error,
269    },
270    /// Failed to close
271    #[error("Failed to close connection1")]
272    Close {
273        #[source]
274        error: anyhow::Error,
275    },
276}
277
278/// Reason why we aborted an incoming sync request.
279#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
280pub enum AbortReason {
281    /// Namespace is not available.
282    NotFound,
283    /// We are already syncing this namespace.
284    AlreadySyncing,
285    /// We experienced an error while trying to provide the requested resource
286    InternalServerError,
287}
288
289impl AcceptError {
290    fn connect(error: impl Into<anyhow::Error>) -> Self {
291        Self::Connect {
292            error: error.into(),
293        }
294    }
295    fn open(peer: PublicKey, error: impl Into<anyhow::Error>) -> Self {
296        Self::Open {
297            peer,
298            error: error.into(),
299        }
300    }
301    pub(crate) fn sync(
302        peer: PublicKey,
303        namespace: Option<NamespaceId>,
304        error: impl Into<anyhow::Error>,
305    ) -> Self {
306        Self::Sync {
307            peer,
308            namespace,
309            error: error.into(),
310        }
311    }
312    fn close(
313        peer: PublicKey,
314        namespace: Option<NamespaceId>,
315        error: impl Into<anyhow::Error>,
316    ) -> Self {
317        Self::Close {
318            peer,
319            namespace,
320            error: error.into(),
321        }
322    }
323    /// Get the peer's node ID (if available)
324    pub fn peer(&self) -> Option<PublicKey> {
325        match self {
326            AcceptError::Connect { .. } => None,
327            AcceptError::Open { peer, .. } => Some(*peer),
328            AcceptError::Sync { peer, .. } => Some(*peer),
329            AcceptError::Close { peer, .. } => Some(*peer),
330            AcceptError::Abort { peer, .. } => Some(*peer),
331        }
332    }
333
334    /// Get the namespace (if available)
335    pub fn namespace(&self) -> Option<NamespaceId> {
336        match self {
337            AcceptError::Connect { .. } => None,
338            AcceptError::Open { .. } => None,
339            AcceptError::Sync { namespace, .. } => namespace.to_owned(),
340            AcceptError::Close { namespace, .. } => namespace.to_owned(),
341            AcceptError::Abort { namespace, .. } => Some(*namespace),
342        }
343    }
344}
345
346impl ConnectError {
347    fn connect(error: impl Into<anyhow::Error>) -> Self {
348        Self::Connect {
349            error: error.into(),
350        }
351    }
352    fn close(error: impl Into<anyhow::Error>) -> Self {
353        Self::Close {
354            error: error.into(),
355        }
356    }
357    pub(crate) fn sync(error: impl Into<anyhow::Error>) -> Self {
358        Self::Sync {
359            error: error.into(),
360        }
361    }
362    pub(crate) fn remote_abort(reason: AbortReason) -> Self {
363        Self::RemoteAbort(reason)
364    }
365}