Skip to main content

radicle_protocol/service/
command.rs

1use std::{collections::HashSet, fmt, sync::Arc, time};
2
3use crossbeam_channel::Receiver;
4use crossbeam_channel::SendError;
5use crossbeam_channel::Sender;
6use radicle::crypto::PublicKey;
7use radicle::node::policy::Scope;
8use radicle::node::FetchResult;
9use radicle::node::Seeds;
10use radicle::node::{Address, Alias, Config, ConnectOptions};
11use radicle::storage::refs::RefsAt;
12use radicle_core::{NodeId, RepoId};
13use thiserror::Error;
14
15use super::ServiceState;
16
17/// Function used to query internal service state.
18pub type QueryState = dyn Fn(&dyn ServiceState) -> Result<()> + Send + Sync;
19
20/// A result returned from processing a [`Command`].
21///
22/// It is a type synonym for a [`std::result::Result`]
23pub type Result<T> = std::result::Result<T, Error>;
24
25/// A [`Responder`] returns results after processing a service [`Command`].
26///
27/// To construct a [`Responder`], use [`Responder::oneshot`], which also returns its
28/// corresponding [`Receiver`].
29///
30/// To send results, use either:
31/// - [`Responder::send`]
32/// - [`Responder::ok`]
33/// - [`Responder::err`]
34#[derive(Debug)]
35pub struct Responder<T> {
36    channel: Sender<Result<T>>,
37}
38
39impl<T> Responder<T> {
40    /// Construct a new [`Responder`] and its corresponding [`Receiver`].
41    pub fn oneshot() -> (Self, Receiver<Result<T>>) {
42        let (sender, receiver) = crossbeam_channel::bounded(1);
43        (Self { channel: sender }, receiver)
44    }
45
46    /// Send a [`Result`] to the receiver.
47    pub fn send(self, result: Result<T>) -> std::result::Result<(), SendError<Result<T>>> {
48        self.channel.send(result)
49    }
50
51    /// Send a [`Result::Ok`] to the receiver.
52    pub fn ok(self, value: T) -> std::result::Result<(), SendError<Result<T>>> {
53        self.send(Ok(value))
54    }
55
56    /// Send a [`Result::Err`] to the receiver.
57    pub fn err<E>(self, error: E) -> std::result::Result<(), SendError<Result<T>>>
58    where
59        E: std::error::Error + Send + Sync + 'static,
60    {
61        self.send(Err(Error::other(error)))
62    }
63}
64
65/// Commands sent to the service by the operator.
66///
67/// Each variant has a corresponding helper constructor, e.g. [`Command::Seed`]
68/// and [`Command::seed`]. These constructors will hide the construction of the
69/// [`Responder`], and return the corresponding [`Receiver`] to receive the
70/// result of the command process.
71///
72/// If the command does not return a [`Responder`], then it will only return the
73/// [`Command`] variant, e.g. [`Command::AnnounceInventory`].
74pub enum Command {
75    /// Announce repository references for given repository and namespaces to peers.
76    AnnounceRefs(RepoId, HashSet<PublicKey>, Responder<RefsAt>),
77    /// Announce local repositories to peers.
78    AnnounceInventory,
79    /// Add repository to local inventory.
80    AddInventory(RepoId, Responder<bool>),
81    /// Connect to node with the given address.
82    Connect(NodeId, Address, ConnectOptions),
83    /// Disconnect from node.
84    Disconnect(NodeId),
85    /// Get the node configuration.
86    Config(Responder<Config>),
87    /// Get the node's listen addresses.
88    ListenAddrs(Responder<Vec<std::net::SocketAddr>>),
89    /// Lookup seeds for the given repository in the routing table, and report
90    /// sync status for given namespaces.
91    Seeds(RepoId, HashSet<PublicKey>, Responder<Seeds>),
92    /// Fetch the given repository from the network.
93    Fetch(RepoId, NodeId, time::Duration, Responder<FetchResult>),
94    /// Seed the given repository.
95    Seed(RepoId, Scope, Responder<bool>),
96    /// Unseed the given repository.
97    Unseed(RepoId, Responder<bool>),
98    /// Follow the given node.
99    Follow(NodeId, Option<Alias>, Responder<bool>),
100    /// Unfollow the given node.
101    Unfollow(NodeId, Responder<bool>),
102    /// Block the given node.
103    Block(NodeId, Sender<bool>),
104    /// Query the internal service state.
105    QueryState(Arc<QueryState>, Sender<Result<()>>),
106}
107
108impl Command {
109    pub fn announce_refs(
110        rid: RepoId,
111        keys: HashSet<PublicKey>,
112    ) -> (Self, Receiver<Result<RefsAt>>) {
113        let (responder, receiver) = Responder::oneshot();
114        (Self::AnnounceRefs(rid, keys, responder), receiver)
115    }
116
117    pub fn announce_inventory() -> Self {
118        Self::AnnounceInventory
119    }
120
121    pub fn add_inventory(rid: RepoId) -> (Self, Receiver<Result<bool>>) {
122        let (responder, receiver) = Responder::oneshot();
123        (Self::AddInventory(rid, responder), receiver)
124    }
125
126    pub fn connect(node_id: NodeId, address: Address, options: ConnectOptions) -> Self {
127        Self::Connect(node_id, address, options)
128    }
129
130    pub fn disconnect(node_id: NodeId) -> Self {
131        Self::Disconnect(node_id)
132    }
133
134    pub fn config() -> (Self, Receiver<Result<Config>>) {
135        let (responder, receiver) = Responder::oneshot();
136        (Self::Config(responder), receiver)
137    }
138
139    pub fn listen_addrs() -> (Self, Receiver<Result<Vec<std::net::SocketAddr>>>) {
140        let (responder, receiver) = Responder::oneshot();
141        (Self::ListenAddrs(responder), receiver)
142    }
143
144    pub fn seeds(rid: RepoId, keys: HashSet<PublicKey>) -> (Self, Receiver<Result<Seeds>>) {
145        let (responder, receiver) = Responder::oneshot();
146        (Self::Seeds(rid, keys, responder), receiver)
147    }
148
149    pub fn fetch(
150        rid: RepoId,
151        node_id: NodeId,
152        duration: time::Duration,
153    ) -> (Self, Receiver<Result<FetchResult>>) {
154        let (responder, receiver) = Responder::oneshot();
155        (Self::Fetch(rid, node_id, duration, responder), receiver)
156    }
157
158    pub fn seed(rid: RepoId, scope: Scope) -> (Self, Receiver<Result<bool>>) {
159        let (responder, receiver) = Responder::oneshot();
160        (Self::Seed(rid, scope, responder), receiver)
161    }
162
163    pub fn unseed(rid: RepoId) -> (Self, Receiver<Result<bool>>) {
164        let (responder, receiver) = Responder::oneshot();
165        (Self::Unseed(rid, responder), receiver)
166    }
167
168    pub fn follow(node_id: NodeId, alias: Option<Alias>) -> (Self, Receiver<Result<bool>>) {
169        let (responder, receiver) = Responder::oneshot();
170        (Self::Follow(node_id, alias, responder), receiver)
171    }
172
173    pub fn unfollow(node_id: NodeId) -> (Self, Receiver<Result<bool>>) {
174        let (responder, receiver) = Responder::oneshot();
175        (Self::Unfollow(node_id, responder), receiver)
176    }
177
178    pub fn query_state(state: Arc<QueryState>, sender: Sender<Result<()>>) -> Self {
179        Self::QueryState(state, sender)
180    }
181}
182
183impl fmt::Debug for Command {
184    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185        match self {
186            Self::AnnounceRefs(id, _, _) => write!(f, "AnnounceRefs({id})"),
187            Self::AnnounceInventory => write!(f, "AnnounceInventory"),
188            Self::AddInventory(rid, _) => write!(f, "AddInventory({rid})"),
189            Self::Connect(id, addr, opts) => write!(f, "Connect({id}, {addr}, {opts:?})"),
190            Self::Disconnect(id) => write!(f, "Disconnect({id})"),
191            Self::Config(_) => write!(f, "Config"),
192            Self::ListenAddrs(_) => write!(f, "ListenAddrs"),
193            Self::Seeds(id, _, _) => write!(f, "Seeds({id})"),
194            Self::Fetch(id, node, _, _) => write!(f, "Fetch({id}, {node})"),
195            Self::Seed(id, scope, _) => write!(f, "Seed({id}, {scope})"),
196            Self::Unseed(id, _) => write!(f, "Unseed({id})"),
197            Self::Follow(id, _, _) => write!(f, "Follow({id})"),
198            Self::Unfollow(id, _) => write!(f, "Unfollow({id})"),
199            Self::Block(id, _) => write!(f, "Block({id})"),
200            Self::QueryState { .. } => write!(f, "QueryState(..)"),
201        }
202    }
203}
204
205/// An error that occurred when processing a service [`Command`].
206#[non_exhaustive]
207#[derive(Debug, Error)]
208pub enum Error {
209    #[error("{0}")]
210    Other(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
211}
212
213impl Error {
214    pub(super) fn other<E>(error: E) -> Self
215    where
216        E: std::error::Error + Send + Sync + 'static,
217    {
218        Self::Other(Box::new(error))
219    }
220
221    pub(super) fn custom(message: String) -> Self {
222        Self::other(Custom { message })
223    }
224}
225
226#[derive(Debug, Error)]
227#[error("{message}")]
228struct Custom {
229    message: String,
230}