Skip to main content

radicle_protocol/
worker.rs

1#![allow(clippy::too_many_arguments)]
2pub mod fetch;
3
4use std::io;
5
6use radicle::identity::RepoId;
7use radicle::node::Event;
8use radicle::prelude::NodeId;
9use radicle::storage::refs::RefsAt;
10
11use radicle::node::events::Emitter;
12
13/// Error returned by fetch.
14#[derive(thiserror::Error, Debug)]
15pub enum FetchError {
16    #[error("the 'git fetch' command failed with exit code '{code}'")]
17    CommandFailed { code: i32 },
18    #[error(transparent)]
19    Io(#[from] io::Error),
20    #[error(transparent)]
21    Fetch(#[from] fetch::error::Fetch),
22    #[error(transparent)]
23    Handle(#[from] fetch::error::Handle),
24    #[error(transparent)]
25    Storage(#[from] radicle::storage::Error),
26    #[error(transparent)]
27    PolicyStore(#[from] radicle::node::policy::store::Error),
28    #[error(transparent)]
29    Policy(#[from] radicle_fetch::policy::error::Policy),
30    #[error(transparent)]
31    Blocked(#[from] radicle_fetch::policy::error::Blocked),
32}
33
34impl FetchError {
35    /// Check if it's a timeout error.
36    pub fn is_timeout(&self) -> bool {
37        matches!(self, FetchError::Io(e) if e.kind() == io::ErrorKind::TimedOut)
38    }
39}
40
41/// Error returned by fetch responder.
42#[derive(thiserror::Error, Debug)]
43pub enum UploadError {
44    #[error("error parsing git command packet-line: {0}")]
45    PacketLine(io::Error),
46    #[error("error while performing git upload-pack: {0}")]
47    UploadPack(io::Error),
48    #[error(transparent)]
49    Authorization(#[from] AuthorizationError),
50}
51
52impl UploadError {
53    /// Check if it's an end-of-file error.
54    pub fn is_eof(&self) -> bool {
55        matches!(self, UploadError::UploadPack(e) if e.kind() == io::ErrorKind::UnexpectedEof)
56    }
57}
58
59#[derive(thiserror::Error, Debug)]
60pub enum AuthorizationError {
61    #[error("{0} is not authorized to fetch {1}")]
62    Unauthorized(NodeId, RepoId),
63    #[error(transparent)]
64    PolicyStore(#[from] radicle::node::policy::store::Error),
65    #[error(transparent)]
66    Repository(#[from] radicle::storage::RepositoryError),
67}
68
69/// Fetch job sent to worker thread.
70#[derive(Debug, Clone)]
71pub enum FetchRequest {
72    /// Client is initiating a fetch for the repository identified by
73    /// `rid` from the peer identified by `remote`.
74    Initiator {
75        /// Repo to fetch.
76        rid: RepoId,
77        /// Remote peer we are interacting with.
78        remote: NodeId,
79        /// If this fetch is for a particular set of `rad/sigrefs`.
80        refs_at: Option<Vec<RefsAt>>,
81        /// [`Config`] options when initiating the fetch protocol.
82        ///
83        /// [`Config`]: radicle_fetch::Config.
84        config: radicle_fetch::Config,
85    },
86    /// Server is responding to a fetch request by uploading the
87    /// specified `refspecs` sent by the client.
88    Responder {
89        /// Remote peer we are interacting with.
90        remote: NodeId,
91        /// Reporter for upload-pack progress.
92        emitter: Emitter<Event>,
93    },
94}
95
96impl FetchRequest {
97    pub fn remote(&self) -> NodeId {
98        match self {
99            Self::Initiator { remote, .. } | Self::Responder { remote, .. } => *remote,
100        }
101    }
102}
103
104/// Fetch result of an upload or fetch.
105#[derive(Debug)]
106pub enum FetchResult {
107    Initiator {
108        /// Repo fetched.
109        rid: RepoId,
110        /// Fetch result, including remotes fetched.
111        result: Result<fetch::FetchResult, FetchError>,
112    },
113    Responder {
114        /// Repo requested.
115        rid: Option<RepoId>,
116        /// Upload result.
117        result: Result<(), UploadError>,
118    },
119}