iroh_docs/
net.rs

1//! Network implementation of the iroh-docs protocol
2
3use std::{
4    future::Future,
5    time::{Duration, Instant},
6};
7
8use iroh::{Endpoint, NodeAddr, PublicKey};
9use serde::{Deserialize, Serialize};
10use tracing::{debug, error_span, trace, Instrument};
11
12use crate::{
13    actor::SyncHandle,
14    metrics::Metrics,
15    net::codec::{run_alice, BobState},
16    NamespaceId, SyncOutcome,
17};
18
19/// The ALPN identifier for the iroh-docs protocol
20pub const ALPN: &[u8] = b"/iroh-sync/1";
21
22mod codec;
23
24/// Connect to a peer and sync a replica
25pub async fn connect_and_sync(
26    endpoint: &Endpoint,
27    sync: &SyncHandle,
28    namespace: NamespaceId,
29    peer: NodeAddr,
30    metrics: Option<&Metrics>,
31) -> Result<SyncFinished, ConnectError> {
32    let t_start = Instant::now();
33    let peer_id = peer.node_id;
34    trace!("connect");
35    let connection = endpoint
36        .connect(peer, crate::ALPN)
37        .await
38        .map_err(ConnectError::connect)?;
39
40    let (mut send_stream, mut recv_stream) =
41        connection.open_bi().await.map_err(ConnectError::connect)?;
42
43    let t_connect = t_start.elapsed();
44    debug!(?t_connect, "connected");
45
46    let res = run_alice(&mut send_stream, &mut recv_stream, sync, namespace, peer_id).await;
47
48    send_stream.finish().map_err(ConnectError::close)?;
49    send_stream.stopped().await.map_err(ConnectError::close)?;
50    recv_stream
51        .read_to_end(0)
52        .await
53        .map_err(ConnectError::close)?;
54
55    if let Some(metrics) = metrics {
56        if res.is_ok() {
57            metrics.sync_via_connect_success.inc();
58        } else {
59            metrics.sync_via_connect_failure.inc();
60        }
61    }
62
63    let t_process = t_start.elapsed() - t_connect;
64    match &res {
65        Ok(res) => {
66            debug!(
67                ?t_connect,
68                ?t_process,
69                sent = %res.num_sent,
70                recv = %res.num_recv,
71                "done, ok"
72            );
73        }
74        Err(err) => {
75            debug!(?t_connect, ?t_process, ?err, "done, failed");
76        }
77    }
78
79    let outcome = res?;
80
81    let timings = Timings {
82        connect: t_connect,
83        process: t_process,
84    };
85
86    let res = SyncFinished {
87        namespace,
88        peer: peer_id,
89        outcome,
90        timings,
91    };
92
93    Ok(res)
94}
95
96/// Whether we want to accept or reject an incoming sync request.
97#[derive(Debug, Clone)]
98pub enum AcceptOutcome {
99    /// Accept the sync request.
100    Allow,
101    /// Decline the sync request
102    Reject(AbortReason),
103}
104
105/// Handle an iroh-docs connection and sync all shared documents in the replica store.
106pub async fn handle_connection<F, Fut>(
107    sync: SyncHandle,
108    connection: iroh::endpoint::Connection,
109    accept_cb: F,
110    metrics: Option<&Metrics>,
111) -> Result<SyncFinished, AcceptError>
112where
113    F: Fn(NamespaceId, PublicKey) -> Fut,
114    Fut: Future<Output = AcceptOutcome>,
115{
116    let t_start = Instant::now();
117    let peer = connection.remote_node_id().map_err(AcceptError::connect)?;
118    let (mut send_stream, mut recv_stream) = connection
119        .accept_bi()
120        .await
121        .map_err(|e| AcceptError::open(peer, e))?;
122
123    let t_connect = t_start.elapsed();
124    let span = error_span!("accept", peer = %peer.fmt_short(), namespace = tracing::field::Empty);
125    span.in_scope(|| {
126        debug!(?t_connect, "connection established");
127    });
128
129    let mut state = BobState::new(peer);
130    let res = state
131        .run(&mut send_stream, &mut recv_stream, sync, accept_cb)
132        .instrument(span.clone())
133        .await;
134
135    if let Some(metrics) = metrics {
136        if res.is_ok() {
137            metrics.sync_via_accept_success.inc();
138        } else {
139            metrics.sync_via_accept_failure.inc();
140        }
141    }
142
143    let namespace = state.namespace();
144    let outcome = state.into_outcome();
145
146    send_stream
147        .finish()
148        .map_err(|error| AcceptError::close(peer, namespace, error))?;
149    send_stream
150        .stopped()
151        .await
152        .map_err(|error| AcceptError::close(peer, namespace, error))?;
153    recv_stream
154        .read_to_end(0)
155        .await
156        .map_err(|error| AcceptError::close(peer, namespace, error))?;
157
158    let t_process = t_start.elapsed() - t_connect;
159    span.in_scope(|| match &res {
160        Ok(_res) => {
161            debug!(
162                ?t_connect,
163                ?t_process,
164                sent = %outcome.num_sent,
165                recv = %outcome.num_recv,
166                "done, ok"
167            );
168        }
169        Err(err) => {
170            debug!(?t_connect, ?t_process, ?err, "done, failed");
171        }
172    });
173
174    let namespace = res?;
175
176    let timings = Timings {
177        connect: t_connect,
178        process: t_process,
179    };
180    let res = SyncFinished {
181        namespace,
182        outcome,
183        peer,
184        timings,
185    };
186
187    Ok(res)
188}
189
190/// Details of a finished sync operation.
191#[derive(Debug, Clone)]
192pub struct SyncFinished {
193    /// The namespace that was synced.
194    pub namespace: NamespaceId,
195    /// The peer we syned with.
196    pub peer: PublicKey,
197    /// The outcome of the sync operation
198    pub outcome: SyncOutcome,
199    /// The time this operation took
200    pub timings: Timings,
201}
202
203/// Time a sync operation took
204#[derive(Debug, Default, Clone)]
205pub struct Timings {
206    /// Time to establish connection
207    pub connect: Duration,
208    /// Time to run sync exchange
209    pub process: Duration,
210}
211
212/// Errors that may occur on handling incoming sync connections.
213#[derive(thiserror::Error, Debug)]
214#[allow(missing_docs)]
215pub enum AcceptError {
216    /// Failed to establish connection
217    #[error("Failed to establish connection")]
218    Connect {
219        #[source]
220        error: anyhow::Error,
221    },
222    /// Failed to open replica
223    #[error("Failed to open replica with {peer:?}")]
224    Open {
225        peer: PublicKey,
226        #[source]
227        error: anyhow::Error,
228    },
229    /// We aborted the sync request.
230    #[error("Aborted sync of {namespace:?} with {peer:?}: {reason:?}")]
231    Abort {
232        peer: PublicKey,
233        namespace: NamespaceId,
234        reason: AbortReason,
235    },
236    /// Failed to run sync
237    #[error("Failed to sync {namespace:?} with {peer:?}")]
238    Sync {
239        peer: PublicKey,
240        namespace: Option<NamespaceId>,
241        #[source]
242        error: anyhow::Error,
243    },
244    /// Failed to close
245    #[error("Failed to close {namespace:?} with {peer:?}")]
246    Close {
247        peer: PublicKey,
248        namespace: Option<NamespaceId>,
249        #[source]
250        error: anyhow::Error,
251    },
252}
253
254/// Errors that may occur on outgoing sync requests.
255#[derive(thiserror::Error, Debug)]
256#[allow(missing_docs)]
257pub enum ConnectError {
258    /// Failed to establish connection
259    #[error("Failed to establish connection")]
260    Connect {
261        #[source]
262        error: anyhow::Error,
263    },
264    /// The remote peer aborted the sync request.
265    #[error("Remote peer aborted sync: {0:?}")]
266    RemoteAbort(AbortReason),
267    /// Failed to run sync
268    #[error("Failed to sync")]
269    Sync {
270        #[source]
271        error: anyhow::Error,
272    },
273    /// Failed to close
274    #[error("Failed to close connection1")]
275    Close {
276        #[source]
277        error: anyhow::Error,
278    },
279}
280
281/// Reason why we aborted an incoming sync request.
282#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
283pub enum AbortReason {
284    /// Namespace is not available.
285    NotFound,
286    /// We are already syncing this namespace.
287    AlreadySyncing,
288    /// We experienced an error while trying to provide the requested resource
289    InternalServerError,
290}
291
292impl AcceptError {
293    fn connect(error: impl Into<anyhow::Error>) -> Self {
294        Self::Connect {
295            error: error.into(),
296        }
297    }
298    fn open(peer: PublicKey, error: impl Into<anyhow::Error>) -> Self {
299        Self::Open {
300            peer,
301            error: error.into(),
302        }
303    }
304    pub(crate) fn sync(
305        peer: PublicKey,
306        namespace: Option<NamespaceId>,
307        error: impl Into<anyhow::Error>,
308    ) -> Self {
309        Self::Sync {
310            peer,
311            namespace,
312            error: error.into(),
313        }
314    }
315    fn close(
316        peer: PublicKey,
317        namespace: Option<NamespaceId>,
318        error: impl Into<anyhow::Error>,
319    ) -> Self {
320        Self::Close {
321            peer,
322            namespace,
323            error: error.into(),
324        }
325    }
326    /// Get the peer's node ID (if available)
327    pub fn peer(&self) -> Option<PublicKey> {
328        match self {
329            AcceptError::Connect { .. } => None,
330            AcceptError::Open { peer, .. } => Some(*peer),
331            AcceptError::Sync { peer, .. } => Some(*peer),
332            AcceptError::Close { peer, .. } => Some(*peer),
333            AcceptError::Abort { peer, .. } => Some(*peer),
334        }
335    }
336
337    /// Get the namespace (if available)
338    pub fn namespace(&self) -> Option<NamespaceId> {
339        match self {
340            AcceptError::Connect { .. } => None,
341            AcceptError::Open { .. } => None,
342            AcceptError::Sync { namespace, .. } => namespace.to_owned(),
343            AcceptError::Close { namespace, .. } => namespace.to_owned(),
344            AcceptError::Abort { namespace, .. } => Some(*namespace),
345        }
346    }
347}
348
349impl ConnectError {
350    fn connect(error: impl Into<anyhow::Error>) -> Self {
351        Self::Connect {
352            error: error.into(),
353        }
354    }
355    fn close(error: impl Into<anyhow::Error>) -> Self {
356        Self::Close {
357            error: error.into(),
358        }
359    }
360    pub(crate) fn sync(error: impl Into<anyhow::Error>) -> Self {
361        Self::Sync {
362            error: error.into(),
363        }
364    }
365    pub(crate) fn remote_abort(reason: AbortReason) -> Self {
366        Self::RemoteAbort(reason)
367    }
368}