1use std::{collections::HashSet, fmt, sync::Arc, time};
2
3use crossbeam_channel::Receiver;
4use crossbeam_channel::SendError;
5use crossbeam_channel::Sender;
6use radicle::crypto::PublicKey;
7use radicle::node::FetchResult;
8use radicle::node::Seeds;
9use radicle::node::policy::Scope;
10use radicle::node::{Address, Alias, Config, ConnectOptions};
11use radicle::storage::refs;
12use radicle::storage::refs::RefsAt;
13use radicle_core::{NodeId, RepoId};
14use thiserror::Error;
15
16use super::ServiceState;
17
18pub type QueryState = dyn Fn(&dyn ServiceState) -> Result<()> + Send + Sync;
20
21pub type Result<T> = std::result::Result<T, Error>;
25
26#[derive(Debug)]
36pub struct Responder<T> {
37 channel: Sender<Result<T>>,
38}
39
40impl<T> Responder<T> {
41 pub fn oneshot() -> (Self, Receiver<Result<T>>) {
43 let (sender, receiver) = crossbeam_channel::bounded(1);
44 (Self { channel: sender }, receiver)
45 }
46
47 pub fn send(self, result: Result<T>) -> std::result::Result<(), SendError<Result<T>>> {
49 self.channel.send(result)
50 }
51
52 pub fn ok(self, value: T) -> std::result::Result<(), SendError<Result<T>>> {
54 self.send(Ok(value))
55 }
56
57 pub fn err<E>(self, error: E) -> std::result::Result<(), SendError<Result<T>>>
59 where
60 E: std::error::Error + Send + Sync + 'static,
61 {
62 self.send(Err(Error::other(error)))
63 }
64}
65
66pub enum Command {
76 AnnounceRefs(RepoId, HashSet<PublicKey>, Responder<RefsAt>),
78 AnnounceInventory,
80 AddInventory(RepoId, Responder<bool>),
82 Connect(NodeId, Address, ConnectOptions),
84 Disconnect(NodeId),
86 Config(Responder<Config>),
88 ListenAddrs(Responder<Vec<std::net::SocketAddr>>),
90 Seeds(RepoId, HashSet<PublicKey>, Responder<Seeds>),
93 Fetch(
95 RepoId,
96 NodeId,
97 time::Duration,
98 Option<refs::FeatureLevel>,
99 Responder<FetchResult>,
100 ),
101 Seed(RepoId, Scope, Responder<bool>),
103 Unseed(RepoId, Responder<bool>),
105 Follow(NodeId, Option<Alias>, Responder<bool>),
107 Unfollow(NodeId, Responder<bool>),
109 Block(NodeId, Sender<bool>),
111 QueryState(Arc<QueryState>, Sender<Result<()>>),
113}
114
115impl Command {
116 pub fn announce_refs(
117 rid: RepoId,
118 keys: HashSet<PublicKey>,
119 ) -> (Self, Receiver<Result<RefsAt>>) {
120 let (responder, receiver) = Responder::oneshot();
121 (Self::AnnounceRefs(rid, keys, responder), receiver)
122 }
123
124 pub fn announce_inventory() -> Self {
125 Self::AnnounceInventory
126 }
127
128 pub fn add_inventory(rid: RepoId) -> (Self, Receiver<Result<bool>>) {
129 let (responder, receiver) = Responder::oneshot();
130 (Self::AddInventory(rid, responder), receiver)
131 }
132
133 pub fn connect(node_id: NodeId, address: Address, options: ConnectOptions) -> Self {
134 Self::Connect(node_id, address, options)
135 }
136
137 pub fn disconnect(node_id: NodeId) -> Self {
138 Self::Disconnect(node_id)
139 }
140
141 pub fn config() -> (Self, Receiver<Result<Config>>) {
142 let (responder, receiver) = Responder::oneshot();
143 (Self::Config(responder), receiver)
144 }
145
146 pub fn listen_addrs() -> (Self, Receiver<Result<Vec<std::net::SocketAddr>>>) {
147 let (responder, receiver) = Responder::oneshot();
148 (Self::ListenAddrs(responder), receiver)
149 }
150
151 pub fn seeds(rid: RepoId, keys: HashSet<PublicKey>) -> (Self, Receiver<Result<Seeds>>) {
152 let (responder, receiver) = Responder::oneshot();
153 (Self::Seeds(rid, keys, responder), receiver)
154 }
155
156 pub fn fetch(
157 rid: RepoId,
158 node_id: NodeId,
159 duration: time::Duration,
160 signed_references_minimum_feature_level: Option<refs::FeatureLevel>,
161 ) -> (Self, Receiver<Result<FetchResult>>) {
162 let (responder, receiver) = Responder::oneshot();
163 (
164 Self::Fetch(
165 rid,
166 node_id,
167 duration,
168 signed_references_minimum_feature_level,
169 responder,
170 ),
171 receiver,
172 )
173 }
174
175 pub fn seed(rid: RepoId, scope: Scope) -> (Self, Receiver<Result<bool>>) {
176 let (responder, receiver) = Responder::oneshot();
177 (Self::Seed(rid, scope, responder), receiver)
178 }
179
180 pub fn unseed(rid: RepoId) -> (Self, Receiver<Result<bool>>) {
181 let (responder, receiver) = Responder::oneshot();
182 (Self::Unseed(rid, responder), receiver)
183 }
184
185 pub fn follow(node_id: NodeId, alias: Option<Alias>) -> (Self, Receiver<Result<bool>>) {
186 let (responder, receiver) = Responder::oneshot();
187 (Self::Follow(node_id, alias, responder), receiver)
188 }
189
190 pub fn unfollow(node_id: NodeId) -> (Self, Receiver<Result<bool>>) {
191 let (responder, receiver) = Responder::oneshot();
192 (Self::Unfollow(node_id, responder), receiver)
193 }
194
195 pub fn query_state(state: Arc<QueryState>, sender: Sender<Result<()>>) -> Self {
196 Self::QueryState(state, sender)
197 }
198}
199
200impl fmt::Debug for Command {
201 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
202 match self {
203 Self::AnnounceRefs(id, _, _) => write!(f, "AnnounceRefs({id})"),
204 Self::AnnounceInventory => write!(f, "AnnounceInventory"),
205 Self::AddInventory(rid, _) => write!(f, "AddInventory({rid})"),
206 Self::Connect(id, addr, opts) => write!(f, "Connect({id}, {addr}, {opts:?})"),
207 Self::Disconnect(id) => write!(f, "Disconnect({id})"),
208 Self::Config(_) => write!(f, "Config"),
209 Self::ListenAddrs(_) => write!(f, "ListenAddrs"),
210 Self::Seeds(id, _, _) => write!(f, "Seeds({id})"),
211 Self::Fetch(id, node, _, feature_level, _) => match feature_level {
212 Some(feature_level) => write!(f, "Fetch({id}, {node} {feature_level})"),
213 None => write!(f, "Fetch({id}, {node})"),
214 },
215 Self::Seed(id, scope, _) => write!(f, "Seed({id}, {scope})"),
216 Self::Unseed(id, _) => write!(f, "Unseed({id})"),
217 Self::Follow(id, _, _) => write!(f, "Follow({id})"),
218 Self::Unfollow(id, _) => write!(f, "Unfollow({id})"),
219 Self::Block(id, _) => write!(f, "Block({id})"),
220 Self::QueryState { .. } => write!(f, "QueryState(..)"),
221 }
222 }
223}
224
225#[non_exhaustive]
227#[derive(Debug, Error)]
228pub enum Error {
229 #[error("{0}")]
230 Other(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
231}
232
233impl Error {
234 pub(super) fn other<E>(error: E) -> Self
235 where
236 E: std::error::Error + Send + Sync + 'static,
237 {
238 Self::Other(Box::new(error))
239 }
240
241 pub(super) fn custom(message: String) -> Self {
242 Self::other(Custom { message })
243 }
244}
245
246#[derive(Debug, Error)]
247#[error("{message}")]
248struct Custom {
249 message: String,
250}