1use std::collections::HashSet;
2use std::net;
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::{fmt, io, time};
7
8#[cfg(unix)]
9use std::os::unix::net::UnixStream;
10#[cfg(windows)]
11use uds_windows::UnixStream;
12
13use crossbeam_channel as chan;
14use radicle::crypto::PublicKey;
15use radicle::node::events::{Event, Events};
16use radicle::node::policy;
17use radicle::node::{Config, NodeId};
18use radicle::node::{ConnectOptions, ConnectResult, Seeds};
19use radicle::storage::refs;
20use serde_json::json;
21use thiserror::Error;
22
23use crate::identity::RepoId;
24use crate::node::{Alias, Command, FetchResult};
25use crate::profile::Home;
26use crate::reactor;
27use crate::runtime::Emitter;
28use crate::service;
29use crate::service::QueryState;
30use crate::storage::refs::RefsAt;
31use crate::wire;
32use crate::wire::StreamId;
33use crate::worker::TaskResult;
34
35#[derive(Error, Debug)]
37pub enum Error {
38 #[error("command channel is not connected")]
40 ChannelDisconnected,
41 #[error("command failed: {0}")]
43 Command(#[from] service::command::Error),
44 #[error("the operation timed out")]
46 Timeout,
47 #[error(transparent)]
49 Io(#[from] std::io::Error),
50}
51
52impl From<chan::RecvError> for Error {
53 fn from(_: chan::RecvError) -> Self {
54 Self::ChannelDisconnected
55 }
56}
57
58impl From<chan::RecvTimeoutError> for Error {
59 fn from(err: chan::RecvTimeoutError) -> Self {
60 match err {
61 chan::RecvTimeoutError::Timeout => Self::Timeout,
62 chan::RecvTimeoutError::Disconnected => Self::ChannelDisconnected,
63 }
64 }
65}
66
67impl<T> From<chan::SendError<T>> for Error {
68 fn from(_: chan::SendError<T>) -> Self {
69 Self::ChannelDisconnected
70 }
71}
72
73pub struct Handle {
74 pub(crate) home: Home,
75
76 pub(crate) socket: PathBuf,
78
79 pub(crate) controller: reactor::Controller,
80
81 shutdown: Arc<AtomicBool>,
83 emitter: Emitter<Event>,
85}
86
87impl Handle {
88 pub fn events(&self) -> Events {
90 Events::from(self.emitter.subscribe())
91 }
92}
93
94impl fmt::Debug for Handle {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 f.debug_struct("Handle").field("home", &self.home).finish()
97 }
98}
99
100impl Clone for Handle {
101 fn clone(&self) -> Self {
102 Self {
103 home: self.home.clone(),
104 socket: self.socket.clone(),
105 controller: self.controller.clone(),
106 shutdown: self.shutdown.clone(),
107 emitter: self.emitter.clone(),
108 }
109 }
110}
111
112impl Handle {
113 pub fn new(
114 home: Home,
115 socket: PathBuf,
116 controller: reactor::Controller,
117 emitter: Emitter<Event>,
118 ) -> Self {
119 Self {
120 home,
121 socket,
122 controller,
123 shutdown: Arc::default(),
124 emitter,
125 }
126 }
127
128 pub fn worker_result(&mut self, result: TaskResult) -> Result<(), io::Error> {
129 self.controller.cmd(wire::Control::Worker(result))
130 }
131
132 pub fn flush(&mut self, remote: NodeId, stream: StreamId) -> Result<(), io::Error> {
133 self.controller.cmd(wire::Control::Flush { remote, stream })
134 }
135
136 pub(crate) fn command(&self, cmd: service::Command) -> Result<(), io::Error> {
137 self.controller.cmd(wire::Control::User(cmd))
138 }
139}
140
141impl radicle::node::Handle for Handle {
142 type Sessions = Vec<radicle::node::Session>;
143 type Events = Events;
144 type Event = Event;
145 type Error = Error;
146
147 fn nid(&self) -> Result<NodeId, Self::Error> {
148 let (sender, receiver) = chan::bounded(1);
149 let query: Arc<QueryState> = Arc::new(move |state| {
150 sender.send(*state.nid()).ok();
151 Ok(())
152 });
153 let (err_sender, err_receiver) = chan::bounded(1);
154 self.command(service::Command::QueryState(query, err_sender))?;
155 err_receiver.recv()??;
156
157 let nid = receiver.recv()?;
158
159 Ok(nid)
160 }
161
162 fn is_running(&self) -> bool {
163 true
164 }
165
166 fn connect(
167 &mut self,
168 node: NodeId,
169 addr: radicle::node::Address,
170 opts: ConnectOptions,
171 ) -> Result<ConnectResult, Error> {
172 let events = self.events();
173 let timeout = opts.timeout;
174 let sessions = self.sessions()?;
175 let session = sessions.iter().find(|s| s.nid == node);
176
177 if let Some(s) = session {
178 if s.state.is_connected() {
179 return Ok(ConnectResult::Connected);
180 }
181 }
182 self.command(service::Command::Connect(node, addr, opts))?;
183
184 events
185 .wait(
186 |e| match e {
187 Event::PeerConnected { nid } if nid == &node => Some(ConnectResult::Connected),
188 Event::PeerDisconnected { nid, reason } if nid == &node => {
189 Some(ConnectResult::Disconnected {
190 reason: reason.clone(),
191 })
192 }
193 _ => None,
194 },
195 timeout,
196 )
197 .map_err(Error::from)
198 }
199
200 fn disconnect(&mut self, node: NodeId) -> Result<(), Self::Error> {
201 let events = self.events();
202 self.command(service::Command::Disconnect(node))?;
203 events
204 .wait(
205 |e| match e {
206 Event::PeerDisconnected { nid, .. } if nid == &node => Some(()),
207 _ => None,
208 },
209 time::Duration::MAX,
210 )
211 .map_err(Error::from)
212 }
213
214 fn seeds_for(
215 &mut self,
216 id: RepoId,
217 namespaces: impl IntoIterator<Item = PublicKey>,
218 ) -> Result<Seeds, Self::Error> {
219 let (responder, receiver) = service::command::Responder::oneshot();
220 self.command(service::Command::Seeds(
221 id,
222 HashSet::from_iter(namespaces),
223 responder,
224 ))?;
225 Ok(receiver.recv()??)
226 }
227
228 fn config(&self) -> Result<Config, Self::Error> {
229 let (responder, receiver) = service::command::Responder::oneshot();
230 self.command(service::Command::Config(responder))?;
231 Ok(receiver.recv()??)
232 }
233
234 fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Self::Error> {
235 let (responder, receiver) = service::command::Responder::oneshot();
236 self.command(service::Command::ListenAddrs(responder))?;
237 Ok(receiver.recv()??)
238 }
239
240 fn fetch(
241 &mut self,
242 id: RepoId,
243 from: NodeId,
244 timeout: time::Duration,
245 signed_references_minimum_feature_level: Option<refs::FeatureLevel>,
246 ) -> Result<FetchResult, Error> {
247 let (responder, receiver) = service::command::Responder::oneshot();
248 self.command(service::Command::Fetch(
249 id,
250 from,
251 timeout,
252 signed_references_minimum_feature_level,
253 responder,
254 ))?;
255 Ok(receiver.recv()??)
256 }
257
258 fn follow(&mut self, id: NodeId, alias: Option<Alias>) -> Result<bool, Error> {
259 let (responder, receiver) = service::command::Responder::oneshot();
260 self.command(service::Command::Follow(id, alias, responder))?;
261 Ok(receiver.recv()??)
262 }
263
264 fn unfollow(&mut self, id: NodeId) -> Result<bool, Error> {
265 let (responder, receiver) = service::command::Responder::oneshot();
266 self.command(service::Command::Unfollow(id, responder))?;
267 Ok(receiver.recv()??)
268 }
269
270 fn block(&mut self, id: NodeId) -> Result<bool, Self::Error> {
271 let (sender, receiver) = chan::bounded(1);
272 self.command(service::Command::Block(id, sender))?;
273 receiver.recv().map_err(Error::from)
274 }
275
276 fn seed(&mut self, id: RepoId, scope: policy::Scope) -> Result<bool, Error> {
277 let (responder, receiver) = service::command::Responder::oneshot();
278 self.command(service::Command::Seed(id, scope, responder))?;
279 Ok(receiver.recv()??)
280 }
281
282 fn unseed(&mut self, id: RepoId) -> Result<bool, Error> {
283 let (responder, receiver) = service::command::Responder::oneshot();
284 self.command(service::Command::Unseed(id, responder))?;
285 Ok(receiver.recv()??)
286 }
287
288 fn announce_refs_for(
289 &mut self,
290 id: RepoId,
291 namespaces: impl IntoIterator<Item = PublicKey>,
292 ) -> Result<RefsAt, Error> {
293 let (responder, receiver) = service::command::Responder::oneshot();
294 self.command(service::Command::AnnounceRefs(
295 id,
296 HashSet::from_iter(namespaces),
297 responder,
298 ))?;
299 Ok(receiver.recv()??)
300 }
301
302 fn announce_inventory(&mut self) -> Result<(), Error> {
303 self.command(service::Command::AnnounceInventory)
304 .map_err(Error::from)
305 }
306
307 fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
308 let (responder, receiver) = service::command::Responder::oneshot();
309 self.command(service::Command::AddInventory(rid, responder))?;
310 Ok(receiver.recv()??)
311 }
312
313 fn subscribe(&self, _timeout: time::Duration) -> Result<Self::Events, Self::Error> {
314 Ok(self.events())
315 }
316
317 fn sessions(&self) -> Result<Self::Sessions, Error> {
318 let (sender, receiver) = chan::unbounded();
319 let query: Arc<QueryState> = Arc::new(move |state| {
320 let sessions = state
321 .sessions()
322 .values()
323 .map(radicle::node::Session::from)
324 .collect();
325 sender.send(sessions).ok();
326
327 Ok(())
328 });
329 let (err_sender, err_receiver) = chan::bounded(1);
330 self.command(service::Command::QueryState(query, err_sender))?;
331 err_receiver.recv()??;
332
333 let sessions = receiver.recv()?;
334
335 Ok(sessions)
336 }
337
338 fn session(&self, nid: NodeId) -> Result<Option<radicle::node::Session>, Self::Error> {
339 let (sender, receiver) = chan::bounded(1);
340 let query: Arc<QueryState> = Arc::new(move |state| {
341 let session = state.sessions().get(&nid).map(radicle::node::Session::from);
342 sender.send(session).ok();
343
344 Ok(())
345 });
346 let (err_sender, err_receiver) = chan::bounded(1);
347 self.command(service::Command::QueryState(query, err_sender))?;
348 err_receiver.recv()??;
349
350 let sessions = receiver.recv()?;
351
352 Ok(sessions)
353 }
354
355 fn shutdown(self) -> Result<(), Error> {
356 if self
358 .shutdown
359 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
360 .is_err()
361 {
362 return Ok(());
363 }
364 UnixStream::connect(self.socket)
368 .and_then(|sock| Command::Shutdown.to_writer(sock))
369 .ok();
370
371 self.controller
372 .shutdown()
373 .map_err(|_| Error::ChannelDisconnected)
374 }
375
376 fn debug(&self) -> Result<serde_json::Value, Self::Error> {
377 let (sender, receiver) = chan::bounded(1);
378 let query: Arc<QueryState> = Arc::new(move |state| {
379 let fetching = debug::Fetching::new(state.fetching());
380 let debug = serde_json::json!({
381 "outboxSize": state.outbox().len(),
382 "fetching": fetching,
383 "rateLimiter": state.limiter().buckets.iter().map(|(host, bucket)| {
384 json!({
385 "host": host.to_string(),
386 "bucket": bucket
387 })
388 }).collect::<Vec<_>>(),
389 "events": json!({
390 "subscribers": state.emitter().subscriptions(),
391 "pending": state.emitter().pending(),
392 }),
393 "metrics": state.metrics(),
394 });
395 sender.send(debug).ok();
396
397 Ok(())
398 });
399 let (err_sender, err_receiver) = chan::bounded(1);
400 self.command(service::Command::QueryState(query, err_sender))?;
401 err_receiver.recv()??;
402
403 let debug = receiver.recv()?;
404
405 Ok(debug)
406 }
407}
408
409mod debug {
410 use radicle_protocol::fetcher;
413 use radicle_protocol::fetcher::FetcherState;
414 use serde::Serialize;
415
416 use super::{NodeId, RefsAt, RepoId};
417
418 #[derive(Serialize)]
419 #[serde(rename_all = "camelCase")]
420 pub struct Fetching {
421 active: Vec<ActiveFetch>,
422 queued: Vec<QueuedFetch>,
423 }
424
425 impl Fetching {
426 pub fn new(state: &FetcherState) -> Self {
427 let active = state
428 .active_fetches()
429 .iter()
430 .map(|(rid, fetch)| ActiveFetch::new(*rid, fetch.clone()))
431 .collect();
432 let queued = state
433 .queued_fetches()
434 .iter()
435 .flat_map(|(node, queue)| {
436 queue
437 .iter()
438 .map(|fetch| QueuedFetch::new(*node, fetch.clone()))
439 })
440 .collect();
441 Self { active, queued }
442 }
443 }
444
445 #[derive(Serialize)]
446 #[serde(rename_all = "camelCase")]
447 pub struct ActiveFetch {
448 rid: RepoId,
449 from: NodeId,
450 refs_at: Vec<RefsAt>,
451 }
452
453 impl ActiveFetch {
454 pub fn new(rid: RepoId, fetch: fetcher::ActiveFetch) -> Self {
455 Self {
456 rid,
457 from: fetch.from,
458 refs_at: fetch.refs.into(),
459 }
460 }
461 }
462
463 #[derive(Serialize)]
464 #[serde(rename_all = "camelCase")]
465 pub struct QueuedFetch {
466 nid: NodeId,
467 rid: RepoId,
468 refs_at: Vec<RefsAt>,
469 }
470
471 impl QueuedFetch {
472 pub fn new(node: NodeId, fetch: fetcher::QueuedFetch) -> Self {
473 Self {
474 nid: node,
475 rid: fetch.rid,
476 refs_at: fetch.refs.into(),
477 }
478 }
479 }
480}