1use std::collections::HashSet;
2use std::net;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::{fmt, io, time};
6
7#[cfg(unix)]
8use std::os::unix::net::UnixStream;
9#[cfg(windows)]
10use uds_windows::UnixStream;
11
12use crossbeam_channel as chan;
13use radicle::crypto::PublicKey;
14use radicle::node::events::{Event, Events};
15use radicle::node::policy;
16use radicle::node::{Config, NodeId};
17use radicle::node::{ConnectOptions, ConnectResult, Seeds};
18use radicle::storage::refs;
19use serde_json::json;
20use thiserror::Error;
21
22use crate::identity::RepoId;
23use crate::node::{Alias, Command, FetchResult};
24use crate::profile::Home;
25use crate::reactor;
26use crate::runtime::Emitter;
27use crate::service;
28use crate::service::QueryState;
29use crate::storage::refs::RefsAt;
30use crate::wire;
31use crate::wire::StreamId;
32use crate::worker::TaskResult;
33
34#[derive(Error, Debug)]
36pub enum Error {
37 #[error("command channel is not connected")]
39 ChannelDisconnected,
40 #[error("command failed: {0}")]
42 Command(#[from] service::command::Error),
43 #[error("the operation timed out")]
45 Timeout,
46 #[error(transparent)]
48 Io(#[from] std::io::Error),
49}
50
51impl From<chan::RecvError> for Error {
52 fn from(_: chan::RecvError) -> Self {
53 Self::ChannelDisconnected
54 }
55}
56
57impl From<chan::RecvTimeoutError> for Error {
58 fn from(err: chan::RecvTimeoutError) -> Self {
59 match err {
60 chan::RecvTimeoutError::Timeout => Self::Timeout,
61 chan::RecvTimeoutError::Disconnected => Self::ChannelDisconnected,
62 }
63 }
64}
65
66impl<T> From<chan::SendError<T>> for Error {
67 fn from(_: chan::SendError<T>) -> Self {
68 Self::ChannelDisconnected
69 }
70}
71
72pub struct Handle {
73 pub(crate) home: Home,
74 pub(crate) controller: reactor::Controller,
75
76 shutdown: Arc<AtomicBool>,
78 emitter: Emitter<Event>,
80}
81
82impl Handle {
83 pub fn events(&self) -> Events {
85 Events::from(self.emitter.subscribe())
86 }
87}
88
89impl fmt::Debug for Handle {
90 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91 f.debug_struct("Handle").field("home", &self.home).finish()
92 }
93}
94
95impl Clone for Handle {
96 fn clone(&self) -> Self {
97 Self {
98 home: self.home.clone(),
99 controller: self.controller.clone(),
100 shutdown: self.shutdown.clone(),
101 emitter: self.emitter.clone(),
102 }
103 }
104}
105
106impl Handle {
107 pub fn new(home: Home, controller: reactor::Controller, emitter: Emitter<Event>) -> Self {
108 Self {
109 home,
110 controller,
111 shutdown: Arc::default(),
112 emitter,
113 }
114 }
115
116 pub fn worker_result(&mut self, result: TaskResult) -> Result<(), io::Error> {
117 self.controller.cmd(wire::Control::Worker(result))
118 }
119
120 pub fn flush(&mut self, remote: NodeId, stream: StreamId) -> Result<(), io::Error> {
121 self.controller.cmd(wire::Control::Flush { remote, stream })
122 }
123
124 pub(crate) fn command(&self, cmd: service::Command) -> Result<(), io::Error> {
125 self.controller.cmd(wire::Control::User(cmd))
126 }
127}
128
129impl radicle::node::Handle for Handle {
130 type Sessions = Vec<radicle::node::Session>;
131 type Events = Events;
132 type Event = Event;
133 type Error = Error;
134
135 fn nid(&self) -> Result<NodeId, Self::Error> {
136 let (sender, receiver) = chan::bounded(1);
137 let query: Arc<QueryState> = Arc::new(move |state| {
138 sender.send(*state.nid()).ok();
139 Ok(())
140 });
141 let (err_sender, err_receiver) = chan::bounded(1);
142 self.command(service::Command::QueryState(query, err_sender))?;
143 err_receiver.recv()??;
144
145 let nid = receiver.recv()?;
146
147 Ok(nid)
148 }
149
150 fn is_running(&self) -> bool {
151 true
152 }
153
154 fn connect(
155 &mut self,
156 node: NodeId,
157 addr: radicle::node::Address,
158 opts: ConnectOptions,
159 ) -> Result<ConnectResult, Error> {
160 let events = self.events();
161 let timeout = opts.timeout;
162 let sessions = self.sessions()?;
163 let session = sessions.iter().find(|s| s.nid == node);
164
165 if let Some(s) = session {
166 if s.state.is_connected() {
167 return Ok(ConnectResult::Connected);
168 }
169 }
170 self.command(service::Command::Connect(node, addr, opts))?;
171
172 events
173 .wait(
174 |e| match e {
175 Event::PeerConnected { nid } if nid == &node => Some(ConnectResult::Connected),
176 Event::PeerDisconnected { nid, reason } if nid == &node => {
177 Some(ConnectResult::Disconnected {
178 reason: reason.clone(),
179 })
180 }
181 _ => None,
182 },
183 timeout,
184 )
185 .map_err(Error::from)
186 }
187
188 fn disconnect(&mut self, node: NodeId) -> Result<(), Self::Error> {
189 let events = self.events();
190 self.command(service::Command::Disconnect(node))?;
191 events
192 .wait(
193 |e| match e {
194 Event::PeerDisconnected { nid, .. } if nid == &node => Some(()),
195 _ => None,
196 },
197 time::Duration::MAX,
198 )
199 .map_err(Error::from)
200 }
201
202 fn seeds_for(
203 &mut self,
204 id: RepoId,
205 namespaces: impl IntoIterator<Item = PublicKey>,
206 ) -> Result<Seeds, Self::Error> {
207 let (responder, receiver) = service::command::Responder::oneshot();
208 self.command(service::Command::Seeds(
209 id,
210 HashSet::from_iter(namespaces),
211 responder,
212 ))?;
213 Ok(receiver.recv()??)
214 }
215
216 fn config(&self) -> Result<Config, Self::Error> {
217 let (responder, receiver) = service::command::Responder::oneshot();
218 self.command(service::Command::Config(responder))?;
219 Ok(receiver.recv()??)
220 }
221
222 fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Self::Error> {
223 let (responder, receiver) = service::command::Responder::oneshot();
224 self.command(service::Command::ListenAddrs(responder))?;
225 Ok(receiver.recv()??)
226 }
227
228 fn fetch(
229 &mut self,
230 id: RepoId,
231 from: NodeId,
232 timeout: time::Duration,
233 signed_references_minimum_feature_level: Option<refs::FeatureLevel>,
234 ) -> Result<FetchResult, Error> {
235 let (responder, receiver) = service::command::Responder::oneshot();
236 self.command(service::Command::Fetch(
237 id,
238 from,
239 timeout,
240 signed_references_minimum_feature_level,
241 responder,
242 ))?;
243 Ok(receiver.recv()??)
244 }
245
246 fn follow(&mut self, id: NodeId, alias: Option<Alias>) -> Result<bool, Error> {
247 let (responder, receiver) = service::command::Responder::oneshot();
248 self.command(service::Command::Follow(id, alias, responder))?;
249 Ok(receiver.recv()??)
250 }
251
252 fn unfollow(&mut self, id: NodeId) -> Result<bool, Error> {
253 let (responder, receiver) = service::command::Responder::oneshot();
254 self.command(service::Command::Unfollow(id, responder))?;
255 Ok(receiver.recv()??)
256 }
257
258 fn block(&mut self, id: NodeId) -> Result<bool, Self::Error> {
259 let (sender, receiver) = chan::bounded(1);
260 self.command(service::Command::Block(id, sender))?;
261 receiver.recv().map_err(Error::from)
262 }
263
264 fn seed(&mut self, id: RepoId, scope: policy::Scope) -> Result<bool, Error> {
265 let (responder, receiver) = service::command::Responder::oneshot();
266 self.command(service::Command::Seed(id, scope, responder))?;
267 Ok(receiver.recv()??)
268 }
269
270 fn unseed(&mut self, id: RepoId) -> Result<bool, Error> {
271 let (responder, receiver) = service::command::Responder::oneshot();
272 self.command(service::Command::Unseed(id, responder))?;
273 Ok(receiver.recv()??)
274 }
275
276 fn announce_refs_for(
277 &mut self,
278 id: RepoId,
279 namespaces: impl IntoIterator<Item = PublicKey>,
280 ) -> Result<RefsAt, Error> {
281 let (responder, receiver) = service::command::Responder::oneshot();
282 self.command(service::Command::AnnounceRefs(
283 id,
284 HashSet::from_iter(namespaces),
285 responder,
286 ))?;
287 Ok(receiver.recv()??)
288 }
289
290 fn announce_inventory(&mut self) -> Result<(), Error> {
291 self.command(service::Command::AnnounceInventory)
292 .map_err(Error::from)
293 }
294
295 fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
296 let (responder, receiver) = service::command::Responder::oneshot();
297 self.command(service::Command::AddInventory(rid, responder))?;
298 Ok(receiver.recv()??)
299 }
300
301 fn subscribe(&self, _timeout: time::Duration) -> Result<Self::Events, Self::Error> {
302 Ok(self.events())
303 }
304
305 fn sessions(&self) -> Result<Self::Sessions, Error> {
306 let (sender, receiver) = chan::unbounded();
307 let query: Arc<QueryState> = Arc::new(move |state| {
308 let sessions = state
309 .sessions()
310 .values()
311 .map(radicle::node::Session::from)
312 .collect();
313 sender.send(sessions).ok();
314
315 Ok(())
316 });
317 let (err_sender, err_receiver) = chan::bounded(1);
318 self.command(service::Command::QueryState(query, err_sender))?;
319 err_receiver.recv()??;
320
321 let sessions = receiver.recv()?;
322
323 Ok(sessions)
324 }
325
326 fn session(&self, nid: NodeId) -> Result<Option<radicle::node::Session>, Self::Error> {
327 let (sender, receiver) = chan::bounded(1);
328 let query: Arc<QueryState> = Arc::new(move |state| {
329 let session = state.sessions().get(&nid).map(radicle::node::Session::from);
330 sender.send(session).ok();
331
332 Ok(())
333 });
334 let (err_sender, err_receiver) = chan::bounded(1);
335 self.command(service::Command::QueryState(query, err_sender))?;
336 err_receiver.recv()??;
337
338 let sessions = receiver.recv()?;
339
340 Ok(sessions)
341 }
342
343 fn shutdown(self) -> Result<(), Error> {
344 if self
346 .shutdown
347 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
348 .is_err()
349 {
350 return Ok(());
351 }
352 UnixStream::connect(self.home.socket())
356 .and_then(|sock| Command::Shutdown.to_writer(sock))
357 .ok();
358
359 self.controller
360 .shutdown()
361 .map_err(|_| Error::ChannelDisconnected)
362 }
363
364 fn debug(&self) -> Result<serde_json::Value, Self::Error> {
365 let (sender, receiver) = chan::bounded(1);
366 let query: Arc<QueryState> = Arc::new(move |state| {
367 let fetching = debug::Fetching::new(state.fetching());
368 let debug = serde_json::json!({
369 "outboxSize": state.outbox().len(),
370 "fetching": fetching,
371 "rateLimiter": state.limiter().buckets.iter().map(|(host, bucket)| {
372 json!({
373 "host": host.to_string(),
374 "bucket": bucket
375 })
376 }).collect::<Vec<_>>(),
377 "events": json!({
378 "subscribers": state.emitter().subscriptions(),
379 "pending": state.emitter().pending(),
380 }),
381 "metrics": state.metrics(),
382 });
383 sender.send(debug).ok();
384
385 Ok(())
386 });
387 let (err_sender, err_receiver) = chan::bounded(1);
388 self.command(service::Command::QueryState(query, err_sender))?;
389 err_receiver.recv()??;
390
391 let debug = receiver.recv()?;
392
393 Ok(debug)
394 }
395}
396
397mod debug {
398 use radicle_protocol::fetcher;
401 use radicle_protocol::fetcher::FetcherState;
402 use serde::Serialize;
403
404 use super::{NodeId, RefsAt, RepoId};
405
406 #[derive(Serialize)]
407 #[serde(rename_all = "camelCase")]
408 pub struct Fetching {
409 active: Vec<ActiveFetch>,
410 queued: Vec<QueuedFetch>,
411 }
412
413 impl Fetching {
414 pub fn new(state: &FetcherState) -> Self {
415 let active = state
416 .active_fetches()
417 .iter()
418 .map(|(rid, fetch)| ActiveFetch::new(*rid, fetch.clone()))
419 .collect();
420 let queued = state
421 .queued_fetches()
422 .iter()
423 .flat_map(|(node, queue)| {
424 queue
425 .iter()
426 .map(|fetch| QueuedFetch::new(*node, fetch.clone()))
427 })
428 .collect();
429 Self { active, queued }
430 }
431 }
432
433 #[derive(Serialize)]
434 #[serde(rename_all = "camelCase")]
435 pub struct ActiveFetch {
436 rid: RepoId,
437 from: NodeId,
438 refs_at: Vec<RefsAt>,
439 }
440
441 impl ActiveFetch {
442 pub fn new(rid: RepoId, fetch: fetcher::ActiveFetch) -> Self {
443 Self {
444 rid,
445 from: fetch.from,
446 refs_at: fetch.refs.into(),
447 }
448 }
449 }
450
451 #[derive(Serialize)]
452 #[serde(rename_all = "camelCase")]
453 pub struct QueuedFetch {
454 nid: NodeId,
455 rid: RepoId,
456 refs_at: Vec<RefsAt>,
457 }
458
459 impl QueuedFetch {
460 pub fn new(node: NodeId, fetch: fetcher::QueuedFetch) -> Self {
461 Self {
462 nid: node,
463 rid: fetch.rid,
464 refs_at: fetch.refs.into(),
465 }
466 }
467 }
468}