1use std::{collections::HashSet, fmt, sync::Arc, time};
2
3use crossbeam_channel::Receiver;
4use crossbeam_channel::SendError;
5use crossbeam_channel::Sender;
6use radicle::crypto::PublicKey;
7use radicle::node::policy::Scope;
8use radicle::node::FetchResult;
9use radicle::node::Seeds;
10use radicle::node::{Address, Alias, Config, ConnectOptions};
11use radicle::storage::refs::RefsAt;
12use radicle_core::{NodeId, RepoId};
13use thiserror::Error;
14
15use super::ServiceState;
16
17pub type QueryState = dyn Fn(&dyn ServiceState) -> Result<()> + Send + Sync;
19
20pub type Result<T> = std::result::Result<T, Error>;
24
25#[derive(Debug)]
35pub struct Responder<T> {
36 channel: Sender<Result<T>>,
37}
38
39impl<T> Responder<T> {
40 pub fn oneshot() -> (Self, Receiver<Result<T>>) {
42 let (sender, receiver) = crossbeam_channel::bounded(1);
43 (Self { channel: sender }, receiver)
44 }
45
46 pub fn send(self, result: Result<T>) -> std::result::Result<(), SendError<Result<T>>> {
48 self.channel.send(result)
49 }
50
51 pub fn ok(self, value: T) -> std::result::Result<(), SendError<Result<T>>> {
53 self.send(Ok(value))
54 }
55
56 pub fn err<E>(self, error: E) -> std::result::Result<(), SendError<Result<T>>>
58 where
59 E: std::error::Error + Send + Sync + 'static,
60 {
61 self.send(Err(Error::other(error)))
62 }
63}
64
65pub enum Command {
75 AnnounceRefs(RepoId, HashSet<PublicKey>, Responder<RefsAt>),
77 AnnounceInventory,
79 AddInventory(RepoId, Responder<bool>),
81 Connect(NodeId, Address, ConnectOptions),
83 Disconnect(NodeId),
85 Config(Responder<Config>),
87 ListenAddrs(Responder<Vec<std::net::SocketAddr>>),
89 Seeds(RepoId, HashSet<PublicKey>, Responder<Seeds>),
92 Fetch(RepoId, NodeId, time::Duration, Responder<FetchResult>),
94 Seed(RepoId, Scope, Responder<bool>),
96 Unseed(RepoId, Responder<bool>),
98 Follow(NodeId, Option<Alias>, Responder<bool>),
100 Unfollow(NodeId, Responder<bool>),
102 Block(NodeId, Sender<bool>),
104 QueryState(Arc<QueryState>, Sender<Result<()>>),
106}
107
108impl Command {
109 pub fn announce_refs(
110 rid: RepoId,
111 keys: HashSet<PublicKey>,
112 ) -> (Self, Receiver<Result<RefsAt>>) {
113 let (responder, receiver) = Responder::oneshot();
114 (Self::AnnounceRefs(rid, keys, responder), receiver)
115 }
116
117 pub fn announce_inventory() -> Self {
118 Self::AnnounceInventory
119 }
120
121 pub fn add_inventory(rid: RepoId) -> (Self, Receiver<Result<bool>>) {
122 let (responder, receiver) = Responder::oneshot();
123 (Self::AddInventory(rid, responder), receiver)
124 }
125
126 pub fn connect(node_id: NodeId, address: Address, options: ConnectOptions) -> Self {
127 Self::Connect(node_id, address, options)
128 }
129
130 pub fn disconnect(node_id: NodeId) -> Self {
131 Self::Disconnect(node_id)
132 }
133
134 pub fn config() -> (Self, Receiver<Result<Config>>) {
135 let (responder, receiver) = Responder::oneshot();
136 (Self::Config(responder), receiver)
137 }
138
139 pub fn listen_addrs() -> (Self, Receiver<Result<Vec<std::net::SocketAddr>>>) {
140 let (responder, receiver) = Responder::oneshot();
141 (Self::ListenAddrs(responder), receiver)
142 }
143
144 pub fn seeds(rid: RepoId, keys: HashSet<PublicKey>) -> (Self, Receiver<Result<Seeds>>) {
145 let (responder, receiver) = Responder::oneshot();
146 (Self::Seeds(rid, keys, responder), receiver)
147 }
148
149 pub fn fetch(
150 rid: RepoId,
151 node_id: NodeId,
152 duration: time::Duration,
153 ) -> (Self, Receiver<Result<FetchResult>>) {
154 let (responder, receiver) = Responder::oneshot();
155 (Self::Fetch(rid, node_id, duration, responder), receiver)
156 }
157
158 pub fn seed(rid: RepoId, scope: Scope) -> (Self, Receiver<Result<bool>>) {
159 let (responder, receiver) = Responder::oneshot();
160 (Self::Seed(rid, scope, responder), receiver)
161 }
162
163 pub fn unseed(rid: RepoId) -> (Self, Receiver<Result<bool>>) {
164 let (responder, receiver) = Responder::oneshot();
165 (Self::Unseed(rid, responder), receiver)
166 }
167
168 pub fn follow(node_id: NodeId, alias: Option<Alias>) -> (Self, Receiver<Result<bool>>) {
169 let (responder, receiver) = Responder::oneshot();
170 (Self::Follow(node_id, alias, responder), receiver)
171 }
172
173 pub fn unfollow(node_id: NodeId) -> (Self, Receiver<Result<bool>>) {
174 let (responder, receiver) = Responder::oneshot();
175 (Self::Unfollow(node_id, responder), receiver)
176 }
177
178 pub fn query_state(state: Arc<QueryState>, sender: Sender<Result<()>>) -> Self {
179 Self::QueryState(state, sender)
180 }
181}
182
183impl fmt::Debug for Command {
184 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185 match self {
186 Self::AnnounceRefs(id, _, _) => write!(f, "AnnounceRefs({id})"),
187 Self::AnnounceInventory => write!(f, "AnnounceInventory"),
188 Self::AddInventory(rid, _) => write!(f, "AddInventory({rid})"),
189 Self::Connect(id, addr, opts) => write!(f, "Connect({id}, {addr}, {opts:?})"),
190 Self::Disconnect(id) => write!(f, "Disconnect({id})"),
191 Self::Config(_) => write!(f, "Config"),
192 Self::ListenAddrs(_) => write!(f, "ListenAddrs"),
193 Self::Seeds(id, _, _) => write!(f, "Seeds({id})"),
194 Self::Fetch(id, node, _, _) => write!(f, "Fetch({id}, {node})"),
195 Self::Seed(id, scope, _) => write!(f, "Seed({id}, {scope})"),
196 Self::Unseed(id, _) => write!(f, "Unseed({id})"),
197 Self::Follow(id, _, _) => write!(f, "Follow({id})"),
198 Self::Unfollow(id, _) => write!(f, "Unfollow({id})"),
199 Self::Block(id, _) => write!(f, "Block({id})"),
200 Self::QueryState { .. } => write!(f, "QueryState(..)"),
201 }
202 }
203}
204
205#[non_exhaustive]
207#[derive(Debug, Error)]
208pub enum Error {
209 #[error("{0}")]
210 Other(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
211}
212
213impl Error {
214 pub(super) fn other<E>(error: E) -> Self
215 where
216 E: std::error::Error + Send + Sync + 'static,
217 {
218 Self::Other(Box::new(error))
219 }
220
221 pub(super) fn custom(message: String) -> Self {
222 Self::other(Custom { message })
223 }
224}
225
226#[derive(Debug, Error)]
227#[error("{message}")]
228struct Custom {
229 message: String,
230}