Skip to main content

radicle_protocol/service/
command.rs

1use std::{collections::HashSet, fmt, sync::Arc, time};
2
3use crossbeam_channel::Receiver;
4use crossbeam_channel::SendError;
5use crossbeam_channel::Sender;
6use radicle::crypto::PublicKey;
7use radicle::node::FetchResult;
8use radicle::node::Seeds;
9use radicle::node::policy::Scope;
10use radicle::node::{Address, Alias, Config, ConnectOptions};
11use radicle::storage::refs;
12use radicle::storage::refs::RefsAt;
13use radicle_core::{NodeId, RepoId};
14use thiserror::Error;
15
16use super::ServiceState;
17
18/// Function used to query internal service state.
19pub type QueryState = dyn Fn(&dyn ServiceState) -> Result<()> + Send + Sync;
20
21/// A result returned from processing a [`Command`].
22///
23/// It is a type synonym for a [`std::result::Result`]
24pub type Result<T> = std::result::Result<T, Error>;
25
26/// A [`Responder`] returns results after processing a service [`Command`].
27///
28/// To construct a [`Responder`], use [`Responder::oneshot`], which also returns its
29/// corresponding [`Receiver`].
30///
31/// To send results, use either:
32/// - [`Responder::send`]
33/// - [`Responder::ok`]
34/// - [`Responder::err`]
35#[derive(Debug)]
36pub struct Responder<T> {
37    channel: Sender<Result<T>>,
38}
39
40impl<T> Responder<T> {
41    /// Construct a new [`Responder`] and its corresponding [`Receiver`].
42    pub fn oneshot() -> (Self, Receiver<Result<T>>) {
43        let (sender, receiver) = crossbeam_channel::bounded(1);
44        (Self { channel: sender }, receiver)
45    }
46
47    /// Send a [`Result`] to the receiver.
48    pub fn send(self, result: Result<T>) -> std::result::Result<(), SendError<Result<T>>> {
49        self.channel.send(result)
50    }
51
52    /// Send a [`Result::Ok`] to the receiver.
53    pub fn ok(self, value: T) -> std::result::Result<(), SendError<Result<T>>> {
54        self.send(Ok(value))
55    }
56
57    /// Send a [`Result::Err`] to the receiver.
58    pub fn err<E>(self, error: E) -> std::result::Result<(), SendError<Result<T>>>
59    where
60        E: std::error::Error + Send + Sync + 'static,
61    {
62        self.send(Err(Error::other(error)))
63    }
64}
65
66/// Commands sent to the service by the operator.
67///
68/// Each variant has a corresponding helper constructor, e.g. [`Command::Seed`]
69/// and [`Command::seed`]. These constructors will hide the construction of the
70/// [`Responder`], and return the corresponding [`Receiver`] to receive the
71/// result of the command process.
72///
73/// If the command does not return a [`Responder`], then it will only return the
74/// [`Command`] variant, e.g. [`Command::AnnounceInventory`].
75pub enum Command {
76    /// Announce repository references for given repository and namespaces to peers.
77    AnnounceRefs(RepoId, HashSet<PublicKey>, Responder<RefsAt>),
78    /// Announce local repositories to peers.
79    AnnounceInventory,
80    /// Add repository to local inventory.
81    AddInventory(RepoId, Responder<bool>),
82    /// Connect to node with the given address.
83    Connect(NodeId, Address, ConnectOptions),
84    /// Disconnect from node.
85    Disconnect(NodeId),
86    /// Get the node configuration.
87    Config(Responder<Config>),
88    /// Get the node's listen addresses.
89    ListenAddrs(Responder<Vec<std::net::SocketAddr>>),
90    /// Lookup seeds for the given repository in the routing table, and report
91    /// sync status for given namespaces.
92    Seeds(RepoId, HashSet<PublicKey>, Responder<Seeds>),
93    /// Fetch the given repository from the network.
94    Fetch(
95        RepoId,
96        NodeId,
97        time::Duration,
98        Option<refs::FeatureLevel>,
99        Responder<FetchResult>,
100    ),
101    /// Seed the given repository.
102    Seed(RepoId, Scope, Responder<bool>),
103    /// Unseed the given repository.
104    Unseed(RepoId, Responder<bool>),
105    /// Follow the given node.
106    Follow(NodeId, Option<Alias>, Responder<bool>),
107    /// Unfollow the given node.
108    Unfollow(NodeId, Responder<bool>),
109    /// Block the given node.
110    Block(NodeId, Sender<bool>),
111    /// Query the internal service state.
112    QueryState(Arc<QueryState>, Sender<Result<()>>),
113}
114
115impl Command {
116    pub fn announce_refs(
117        rid: RepoId,
118        keys: HashSet<PublicKey>,
119    ) -> (Self, Receiver<Result<RefsAt>>) {
120        let (responder, receiver) = Responder::oneshot();
121        (Self::AnnounceRefs(rid, keys, responder), receiver)
122    }
123
124    pub fn announce_inventory() -> Self {
125        Self::AnnounceInventory
126    }
127
128    pub fn add_inventory(rid: RepoId) -> (Self, Receiver<Result<bool>>) {
129        let (responder, receiver) = Responder::oneshot();
130        (Self::AddInventory(rid, responder), receiver)
131    }
132
133    pub fn connect(node_id: NodeId, address: Address, options: ConnectOptions) -> Self {
134        Self::Connect(node_id, address, options)
135    }
136
137    pub fn disconnect(node_id: NodeId) -> Self {
138        Self::Disconnect(node_id)
139    }
140
141    pub fn config() -> (Self, Receiver<Result<Config>>) {
142        let (responder, receiver) = Responder::oneshot();
143        (Self::Config(responder), receiver)
144    }
145
146    pub fn listen_addrs() -> (Self, Receiver<Result<Vec<std::net::SocketAddr>>>) {
147        let (responder, receiver) = Responder::oneshot();
148        (Self::ListenAddrs(responder), receiver)
149    }
150
151    pub fn seeds(rid: RepoId, keys: HashSet<PublicKey>) -> (Self, Receiver<Result<Seeds>>) {
152        let (responder, receiver) = Responder::oneshot();
153        (Self::Seeds(rid, keys, responder), receiver)
154    }
155
156    pub fn fetch(
157        rid: RepoId,
158        node_id: NodeId,
159        duration: time::Duration,
160        signed_references_minimum_feature_level: Option<refs::FeatureLevel>,
161    ) -> (Self, Receiver<Result<FetchResult>>) {
162        let (responder, receiver) = Responder::oneshot();
163        (
164            Self::Fetch(
165                rid,
166                node_id,
167                duration,
168                signed_references_minimum_feature_level,
169                responder,
170            ),
171            receiver,
172        )
173    }
174
175    pub fn seed(rid: RepoId, scope: Scope) -> (Self, Receiver<Result<bool>>) {
176        let (responder, receiver) = Responder::oneshot();
177        (Self::Seed(rid, scope, responder), receiver)
178    }
179
180    pub fn unseed(rid: RepoId) -> (Self, Receiver<Result<bool>>) {
181        let (responder, receiver) = Responder::oneshot();
182        (Self::Unseed(rid, responder), receiver)
183    }
184
185    pub fn follow(node_id: NodeId, alias: Option<Alias>) -> (Self, Receiver<Result<bool>>) {
186        let (responder, receiver) = Responder::oneshot();
187        (Self::Follow(node_id, alias, responder), receiver)
188    }
189
190    pub fn unfollow(node_id: NodeId) -> (Self, Receiver<Result<bool>>) {
191        let (responder, receiver) = Responder::oneshot();
192        (Self::Unfollow(node_id, responder), receiver)
193    }
194
195    pub fn query_state(state: Arc<QueryState>, sender: Sender<Result<()>>) -> Self {
196        Self::QueryState(state, sender)
197    }
198}
199
200impl fmt::Debug for Command {
201    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
202        match self {
203            Self::AnnounceRefs(id, _, _) => write!(f, "AnnounceRefs({id})"),
204            Self::AnnounceInventory => write!(f, "AnnounceInventory"),
205            Self::AddInventory(rid, _) => write!(f, "AddInventory({rid})"),
206            Self::Connect(id, addr, opts) => write!(f, "Connect({id}, {addr}, {opts:?})"),
207            Self::Disconnect(id) => write!(f, "Disconnect({id})"),
208            Self::Config(_) => write!(f, "Config"),
209            Self::ListenAddrs(_) => write!(f, "ListenAddrs"),
210            Self::Seeds(id, _, _) => write!(f, "Seeds({id})"),
211            Self::Fetch(id, node, _, feature_level, _) => match feature_level {
212                Some(feature_level) => write!(f, "Fetch({id}, {node} {feature_level})"),
213                None => write!(f, "Fetch({id}, {node})"),
214            },
215            Self::Seed(id, scope, _) => write!(f, "Seed({id}, {scope})"),
216            Self::Unseed(id, _) => write!(f, "Unseed({id})"),
217            Self::Follow(id, _, _) => write!(f, "Follow({id})"),
218            Self::Unfollow(id, _) => write!(f, "Unfollow({id})"),
219            Self::Block(id, _) => write!(f, "Block({id})"),
220            Self::QueryState { .. } => write!(f, "QueryState(..)"),
221        }
222    }
223}
224
225/// An error that occurred when processing a service [`Command`].
226#[non_exhaustive]
227#[derive(Debug, Error)]
228pub enum Error {
229    #[error("{0}")]
230    Other(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
231}
232
233impl Error {
234    pub(super) fn other<E>(error: E) -> Self
235    where
236        E: std::error::Error + Send + Sync + 'static,
237    {
238        Self::Other(Box::new(error))
239    }
240
241    pub(super) fn custom(message: String) -> Self {
242        Self::other(Custom { message })
243    }
244}
245
246#[derive(Debug, Error)]
247#[error("{message}")]
248struct Custom {
249    message: String,
250}