radicle_protocol/
worker.rs

1#![allow(clippy::too_many_arguments)]
2pub mod fetch;
3
4use std::io;
5
6use radicle::identity::RepoId;
7use radicle::node::Event;
8use radicle::prelude::NodeId;
9use radicle::storage::refs::RefsAt;
10
11// use crate::runtime::{thread, Emitter, Handle};
12
13use radicle::node::events::Emitter;
14
15// pub use channels::{ChannelEvent, Channels, ChannelsConfig};
16
17/// Error returned by fetch.
18#[derive(thiserror::Error, Debug)]
19pub enum FetchError {
20    #[error("the 'git fetch' command failed with exit code '{code}'")]
21    CommandFailed { code: i32 },
22    #[error(transparent)]
23    Io(#[from] io::Error),
24    #[error(transparent)]
25    Fetch(#[from] fetch::error::Fetch),
26    #[error(transparent)]
27    Handle(#[from] fetch::error::Handle),
28    #[error(transparent)]
29    Storage(#[from] radicle::storage::Error),
30    #[error(transparent)]
31    PolicyStore(#[from] radicle::node::policy::store::Error),
32    #[error(transparent)]
33    Policy(#[from] radicle_fetch::policy::error::Policy),
34    #[error(transparent)]
35    Blocked(#[from] radicle_fetch::policy::error::Blocked),
36}
37
38impl FetchError {
39    /// Check if it's a timeout error.
40    pub fn is_timeout(&self) -> bool {
41        matches!(self, FetchError::Io(e) if e.kind() == io::ErrorKind::TimedOut)
42    }
43}
44
45/// Error returned by fetch responder.
46#[derive(thiserror::Error, Debug)]
47pub enum UploadError {
48    #[error("error parsing git command packet-line: {0}")]
49    PacketLine(io::Error),
50    #[error("error while performing git upload-pack: {0}")]
51    UploadPack(io::Error),
52    #[error(transparent)]
53    Authorization(#[from] AuthorizationError),
54}
55
56impl UploadError {
57    /// Check if it's an end-of-file error.
58    pub fn is_eof(&self) -> bool {
59        matches!(self, UploadError::UploadPack(e) if e.kind() == io::ErrorKind::UnexpectedEof)
60    }
61}
62
63#[derive(thiserror::Error, Debug)]
64pub enum AuthorizationError {
65    #[error("{0} is not authorized to fetch {1}")]
66    Unauthorized(NodeId, RepoId),
67    #[error(transparent)]
68    PolicyStore(#[from] radicle::node::policy::store::Error),
69    #[error(transparent)]
70    Repository(#[from] radicle::storage::RepositoryError),
71}
72
73/// Fetch job sent to worker thread.
74#[derive(Debug, Clone)]
75pub enum FetchRequest {
76    /// Client is initiating a fetch for the repository identified by
77    /// `rid` from the peer identified by `remote`.
78    Initiator {
79        /// Repo to fetch.
80        rid: RepoId,
81        /// Remote peer we are interacting with.
82        remote: NodeId,
83        /// If this fetch is for a particular set of `rad/sigrefs`.
84        refs_at: Option<Vec<RefsAt>>,
85    },
86    /// Server is responding to a fetch request by uploading the
87    /// specified `refspecs` sent by the client.
88    Responder {
89        /// Remote peer we are interacting with.
90        remote: NodeId,
91        /// Reporter for upload-pack progress.
92        emitter: Emitter<Event>,
93    },
94}
95
96impl FetchRequest {
97    pub fn remote(&self) -> NodeId {
98        match self {
99            Self::Initiator { remote, .. } | Self::Responder { remote, .. } => *remote,
100        }
101    }
102}
103
104/// Fetch result of an upload or fetch.
105#[derive(Debug)]
106pub enum FetchResult {
107    Initiator {
108        /// Repo fetched.
109        rid: RepoId,
110        /// Fetch result, including remotes fetched.
111        result: Result<fetch::FetchResult, FetchError>,
112    },
113    Responder {
114        /// Repo requested.
115        rid: Option<RepoId>,
116        /// Upload result.
117        result: Result<(), UploadError>,
118    },
119}