1use std::collections::HashSet;
2use std::net;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::{fmt, io, time};
6
7#[cfg(unix)]
8use std::os::unix::net::UnixStream;
9#[cfg(windows)]
10use uds_windows::UnixStream;
11
12use crossbeam_channel as chan;
13use radicle::crypto::PublicKey;
14use radicle::node::events::{Event, Events};
15use radicle::node::policy;
16use radicle::node::{Config, NodeId};
17use radicle::node::{ConnectOptions, ConnectResult, Seeds};
18use serde_json::json;
19use thiserror::Error;
20
21use crate::identity::RepoId;
22use crate::node::{Alias, Command, FetchResult};
23use crate::profile::Home;
24use crate::reactor;
25use crate::runtime::Emitter;
26use crate::service;
27use crate::service::QueryState;
28use crate::storage::refs::RefsAt;
29use crate::wire;
30use crate::wire::StreamId;
31use crate::worker::TaskResult;
32
33#[derive(Error, Debug)]
35pub enum Error {
36 #[error("command channel is not connected")]
38 ChannelDisconnected,
39 #[error("command failed: {0}")]
41 Command(#[from] service::command::Error),
42 #[error("the operation timed out")]
44 Timeout,
45 #[error(transparent)]
47 Io(#[from] std::io::Error),
48}
49
50impl From<chan::RecvError> for Error {
51 fn from(_: chan::RecvError) -> Self {
52 Self::ChannelDisconnected
53 }
54}
55
56impl From<chan::RecvTimeoutError> for Error {
57 fn from(err: chan::RecvTimeoutError) -> Self {
58 match err {
59 chan::RecvTimeoutError::Timeout => Self::Timeout,
60 chan::RecvTimeoutError::Disconnected => Self::ChannelDisconnected,
61 }
62 }
63}
64
65impl<T> From<chan::SendError<T>> for Error {
66 fn from(_: chan::SendError<T>) -> Self {
67 Self::ChannelDisconnected
68 }
69}
70
71pub struct Handle {
72 pub(crate) home: Home,
73 pub(crate) controller: reactor::Controller,
74
75 shutdown: Arc<AtomicBool>,
77 emitter: Emitter<Event>,
79}
80
81impl Handle {
82 pub fn events(&self) -> Events {
84 Events::from(self.emitter.subscribe())
85 }
86}
87
88impl fmt::Debug for Handle {
89 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90 f.debug_struct("Handle").field("home", &self.home).finish()
91 }
92}
93
94impl Clone for Handle {
95 fn clone(&self) -> Self {
96 Self {
97 home: self.home.clone(),
98 controller: self.controller.clone(),
99 shutdown: self.shutdown.clone(),
100 emitter: self.emitter.clone(),
101 }
102 }
103}
104
105impl Handle {
106 pub fn new(home: Home, controller: reactor::Controller, emitter: Emitter<Event>) -> Self {
107 Self {
108 home,
109 controller,
110 shutdown: Arc::default(),
111 emitter,
112 }
113 }
114
115 pub fn worker_result(&mut self, result: TaskResult) -> Result<(), io::Error> {
116 self.controller.cmd(wire::Control::Worker(result))
117 }
118
119 pub fn flush(&mut self, remote: NodeId, stream: StreamId) -> Result<(), io::Error> {
120 self.controller.cmd(wire::Control::Flush { remote, stream })
121 }
122
123 pub(crate) fn command(&self, cmd: service::Command) -> Result<(), io::Error> {
124 self.controller.cmd(wire::Control::User(cmd))
125 }
126}
127
128impl radicle::node::Handle for Handle {
129 type Sessions = Vec<radicle::node::Session>;
130 type Events = Events;
131 type Event = Event;
132 type Error = Error;
133
134 fn nid(&self) -> Result<NodeId, Self::Error> {
135 let (sender, receiver) = chan::bounded(1);
136 let query: Arc<QueryState> = Arc::new(move |state| {
137 sender.send(*state.nid()).ok();
138 Ok(())
139 });
140 let (err_sender, err_receiver) = chan::bounded(1);
141 self.command(service::Command::QueryState(query, err_sender))?;
142 err_receiver.recv()??;
143
144 let nid = receiver.recv()?;
145
146 Ok(nid)
147 }
148
149 fn is_running(&self) -> bool {
150 true
151 }
152
153 fn connect(
154 &mut self,
155 node: NodeId,
156 addr: radicle::node::Address,
157 opts: ConnectOptions,
158 ) -> Result<ConnectResult, Error> {
159 let events = self.events();
160 let timeout = opts.timeout;
161 let sessions = self.sessions()?;
162 let session = sessions.iter().find(|s| s.nid == node);
163
164 if let Some(s) = session {
165 if s.state.is_connected() {
166 return Ok(ConnectResult::Connected);
167 }
168 }
169 self.command(service::Command::Connect(node, addr, opts))?;
170
171 events
172 .wait(
173 |e| match e {
174 Event::PeerConnected { nid } if nid == &node => Some(ConnectResult::Connected),
175 Event::PeerDisconnected { nid, reason } if nid == &node => {
176 Some(ConnectResult::Disconnected {
177 reason: reason.clone(),
178 })
179 }
180 _ => None,
181 },
182 timeout,
183 )
184 .map_err(Error::from)
185 }
186
187 fn disconnect(&mut self, node: NodeId) -> Result<(), Self::Error> {
188 let events = self.events();
189 self.command(service::Command::Disconnect(node))?;
190 events
191 .wait(
192 |e| match e {
193 Event::PeerDisconnected { nid, .. } if nid == &node => Some(()),
194 _ => None,
195 },
196 time::Duration::MAX,
197 )
198 .map_err(Error::from)
199 }
200
201 fn seeds_for(
202 &mut self,
203 id: RepoId,
204 namespaces: impl IntoIterator<Item = PublicKey>,
205 ) -> Result<Seeds, Self::Error> {
206 let (responder, receiver) = service::command::Responder::oneshot();
207 self.command(service::Command::Seeds(
208 id,
209 HashSet::from_iter(namespaces),
210 responder,
211 ))?;
212 Ok(receiver.recv()??)
213 }
214
215 fn config(&self) -> Result<Config, Self::Error> {
216 let (responder, receiver) = service::command::Responder::oneshot();
217 self.command(service::Command::Config(responder))?;
218 Ok(receiver.recv()??)
219 }
220
221 fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Self::Error> {
222 let (responder, receiver) = service::command::Responder::oneshot();
223 self.command(service::Command::ListenAddrs(responder))?;
224 Ok(receiver.recv()??)
225 }
226
227 fn fetch(
228 &mut self,
229 id: RepoId,
230 from: NodeId,
231 timeout: time::Duration,
232 ) -> Result<FetchResult, Error> {
233 let (responder, receiver) = service::command::Responder::oneshot();
234 self.command(service::Command::Fetch(id, from, timeout, responder))?;
235 Ok(receiver.recv()??)
236 }
237
238 fn follow(&mut self, id: NodeId, alias: Option<Alias>) -> Result<bool, Error> {
239 let (responder, receiver) = service::command::Responder::oneshot();
240 self.command(service::Command::Follow(id, alias, responder))?;
241 Ok(receiver.recv()??)
242 }
243
244 fn unfollow(&mut self, id: NodeId) -> Result<bool, Error> {
245 let (responder, receiver) = service::command::Responder::oneshot();
246 self.command(service::Command::Unfollow(id, responder))?;
247 Ok(receiver.recv()??)
248 }
249
250 fn block(&mut self, id: NodeId) -> Result<bool, Self::Error> {
251 let (sender, receiver) = chan::bounded(1);
252 self.command(service::Command::Block(id, sender))?;
253 receiver.recv().map_err(Error::from)
254 }
255
256 fn seed(&mut self, id: RepoId, scope: policy::Scope) -> Result<bool, Error> {
257 let (responder, receiver) = service::command::Responder::oneshot();
258 self.command(service::Command::Seed(id, scope, responder))?;
259 Ok(receiver.recv()??)
260 }
261
262 fn unseed(&mut self, id: RepoId) -> Result<bool, Error> {
263 let (responder, receiver) = service::command::Responder::oneshot();
264 self.command(service::Command::Unseed(id, responder))?;
265 Ok(receiver.recv()??)
266 }
267
268 fn announce_refs_for(
269 &mut self,
270 id: RepoId,
271 namespaces: impl IntoIterator<Item = PublicKey>,
272 ) -> Result<RefsAt, Error> {
273 let (responder, receiver) = service::command::Responder::oneshot();
274 self.command(service::Command::AnnounceRefs(
275 id,
276 HashSet::from_iter(namespaces),
277 responder,
278 ))?;
279 Ok(receiver.recv()??)
280 }
281
282 fn announce_inventory(&mut self) -> Result<(), Error> {
283 self.command(service::Command::AnnounceInventory)
284 .map_err(Error::from)
285 }
286
287 fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
288 let (responder, receiver) = service::command::Responder::oneshot();
289 self.command(service::Command::AddInventory(rid, responder))?;
290 Ok(receiver.recv()??)
291 }
292
293 fn subscribe(&self, _timeout: time::Duration) -> Result<Self::Events, Self::Error> {
294 Ok(self.events())
295 }
296
297 fn sessions(&self) -> Result<Self::Sessions, Error> {
298 let (sender, receiver) = chan::unbounded();
299 let query: Arc<QueryState> = Arc::new(move |state| {
300 let sessions = state
301 .sessions()
302 .values()
303 .map(radicle::node::Session::from)
304 .collect();
305 sender.send(sessions).ok();
306
307 Ok(())
308 });
309 let (err_sender, err_receiver) = chan::bounded(1);
310 self.command(service::Command::QueryState(query, err_sender))?;
311 err_receiver.recv()??;
312
313 let sessions = receiver.recv()?;
314
315 Ok(sessions)
316 }
317
318 fn session(&self, nid: NodeId) -> Result<Option<radicle::node::Session>, Self::Error> {
319 let (sender, receiver) = chan::bounded(1);
320 let query: Arc<QueryState> = Arc::new(move |state| {
321 let session = state.sessions().get(&nid).map(radicle::node::Session::from);
322 sender.send(session).ok();
323
324 Ok(())
325 });
326 let (err_sender, err_receiver) = chan::bounded(1);
327 self.command(service::Command::QueryState(query, err_sender))?;
328 err_receiver.recv()??;
329
330 let sessions = receiver.recv()?;
331
332 Ok(sessions)
333 }
334
335 fn shutdown(self) -> Result<(), Error> {
336 if self
338 .shutdown
339 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
340 .is_err()
341 {
342 return Ok(());
343 }
344 UnixStream::connect(self.home.socket())
348 .and_then(|sock| Command::Shutdown.to_writer(sock))
349 .ok();
350
351 self.controller
352 .shutdown()
353 .map_err(|_| Error::ChannelDisconnected)
354 }
355
356 fn debug(&self) -> Result<serde_json::Value, Self::Error> {
357 let (sender, receiver) = chan::bounded(1);
358 let query: Arc<QueryState> = Arc::new(move |state| {
359 let debug = serde_json::json!({
360 "outboxSize": state.outbox().len(),
361 "fetching": state.fetching(),
362 "rateLimiter": state.limiter(),
363 "events": json!({
364 "subscribers": state.emitter().subscriptions(),
365 "pending": state.emitter().pending(),
366 }),
367 "metrics": state.metrics(),
368 });
369 sender.send(debug).ok();
370
371 Ok(())
372 });
373 let (err_sender, err_receiver) = chan::bounded(1);
374 self.command(service::Command::QueryState(query, err_sender))?;
375 err_receiver.recv()??;
376
377 let debug = receiver.recv()?;
378
379 Ok(debug)
380 }
381}