1use std::net;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::Arc;
4use std::{fmt, io, time};
5
6#[cfg(unix)]
7use std::os::unix::net::UnixStream as Stream;
8#[cfg(windows)]
9use winpipe::WinStream as Stream;
10
11use crossbeam_channel as chan;
12use radicle::node::events::{Event, Events};
13use radicle::node::policy;
14use radicle::node::{Config, NodeId};
15use radicle::node::{ConnectOptions, ConnectResult, Seeds};
16use reactor::poller::popol::PopolWaker;
17use serde_json::json;
18use thiserror::Error;
19
20use crate::identity::RepoId;
21use crate::node::{Alias, Command, FetchResult};
22use crate::profile::Home;
23use crate::runtime::Emitter;
24use crate::service;
25use crate::service::{CommandError, QueryState};
26use crate::storage::refs::RefsAt;
27use crate::wire;
28use crate::wire::StreamId;
29use crate::worker::TaskResult;
30
31#[derive(Error, Debug)]
33pub enum Error {
34 #[error("command channel is not connected")]
36 ChannelDisconnected,
37 #[error("command failed: {0}")]
39 Command(#[from] CommandError),
40 #[error("the operation timed out")]
42 Timeout,
43 #[error(transparent)]
45 Io(#[from] std::io::Error),
46}
47
48impl From<chan::RecvError> for Error {
49 fn from(_: chan::RecvError) -> Self {
50 Self::ChannelDisconnected
51 }
52}
53
54impl From<chan::RecvTimeoutError> for Error {
55 fn from(err: chan::RecvTimeoutError) -> Self {
56 match err {
57 chan::RecvTimeoutError::Timeout => Self::Timeout,
58 chan::RecvTimeoutError::Disconnected => Self::ChannelDisconnected,
59 }
60 }
61}
62
63impl<T> From<chan::SendError<T>> for Error {
64 fn from(_: chan::SendError<T>) -> Self {
65 Self::ChannelDisconnected
66 }
67}
68
69pub struct Handle {
70 pub(crate) home: Home,
71 pub(crate) controller: reactor::Controller<wire::Control, PopolWaker>,
72
73 shutdown: Arc<AtomicBool>,
75 emitter: Emitter<Event>,
77}
78
79impl Handle {
80 pub fn events(&self) -> Events {
82 Events::from(self.emitter.subscribe())
83 }
84}
85
86impl fmt::Debug for Handle {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 f.debug_struct("Handle").field("home", &self.home).finish()
89 }
90}
91
92impl Clone for Handle {
93 fn clone(&self) -> Self {
94 Self {
95 home: self.home.clone(),
96 controller: self.controller.clone(),
97 shutdown: self.shutdown.clone(),
98 emitter: self.emitter.clone(),
99 }
100 }
101}
102
103impl Handle {
104 pub fn new(
105 home: Home,
106 controller: reactor::Controller<wire::Control, PopolWaker>,
107 emitter: Emitter<Event>,
108 ) -> Self {
109 Self {
110 home,
111 controller,
112 shutdown: Arc::default(),
113 emitter,
114 }
115 }
116
117 pub fn worker_result(&mut self, result: TaskResult) -> Result<(), io::Error> {
118 self.controller.cmd(wire::Control::Worker(result))
119 }
120
121 pub fn flush(&mut self, remote: NodeId, stream: StreamId) -> Result<(), io::Error> {
122 self.controller.cmd(wire::Control::Flush { remote, stream })
123 }
124
125 pub(crate) fn command(&self, cmd: service::Command) -> Result<(), io::Error> {
126 self.controller.cmd(wire::Control::User(cmd))
127 }
128}
129
130impl radicle::node::Handle for Handle {
131 type Sessions = Vec<radicle::node::Session>;
132 type Events = Events;
133 type Event = Event;
134 type Error = Error;
135
136 fn nid(&self) -> Result<NodeId, Self::Error> {
137 let (sender, receiver) = chan::bounded(1);
138 let query: Arc<QueryState> = Arc::new(move |state| {
139 sender.send(*state.nid()).ok();
140 Ok(())
141 });
142 let (err_sender, err_receiver) = chan::bounded(1);
143 self.command(service::Command::QueryState(query, err_sender))?;
144 err_receiver.recv()??;
145
146 let nid = receiver.recv()?;
147
148 Ok(nid)
149 }
150
151 fn is_running(&self) -> bool {
152 true
153 }
154
155 fn connect(
156 &mut self,
157 node: NodeId,
158 addr: radicle::node::Address,
159 opts: ConnectOptions,
160 ) -> Result<ConnectResult, Error> {
161 let events = self.events();
162 let timeout = opts.timeout;
163 let sessions = self.sessions()?;
164 let session = sessions.iter().find(|s| s.nid == node);
165
166 if let Some(s) = session {
167 if s.state.is_connected() {
168 return Ok(ConnectResult::Connected);
169 }
170 }
171 self.command(service::Command::Connect(node, addr, opts))?;
172
173 events
174 .wait(
175 |e| match e {
176 Event::PeerConnected { nid } if nid == &node => Some(ConnectResult::Connected),
177 Event::PeerDisconnected { nid, reason } if nid == &node => {
178 Some(ConnectResult::Disconnected {
179 reason: reason.clone(),
180 })
181 }
182 _ => None,
183 },
184 timeout,
185 )
186 .map_err(Error::from)
187 }
188
189 fn disconnect(&mut self, node: NodeId) -> Result<(), Self::Error> {
190 let events = self.events();
191 self.command(service::Command::Disconnect(node))?;
192 events
193 .wait(
194 |e| match e {
195 Event::PeerDisconnected { nid, .. } if nid == &node => Some(()),
196 _ => None,
197 },
198 time::Duration::MAX,
199 )
200 .map_err(Error::from)
201 }
202
203 fn seeds(&mut self, id: RepoId) -> Result<Seeds, Self::Error> {
204 let (sender, receiver) = chan::bounded(1);
205 self.command(service::Command::Seeds(id, sender))?;
206 receiver.recv().map_err(Error::from)
207 }
208
209 fn config(&self) -> Result<Config, Self::Error> {
210 let (sender, receiver) = chan::bounded(1);
211 self.command(service::Command::Config(sender))?;
212 receiver.recv().map_err(Error::from)
213 }
214
215 fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Self::Error> {
216 let (sender, receiver) = chan::bounded(1);
217 self.command(service::Command::ListenAddrs(sender))?;
218 receiver.recv().map_err(Error::from)
219 }
220
221 fn fetch(
222 &mut self,
223 id: RepoId,
224 from: NodeId,
225 timeout: time::Duration,
226 ) -> Result<FetchResult, Error> {
227 let (sender, receiver) = chan::bounded(1);
228 self.command(service::Command::Fetch(id, from, timeout, sender))?;
229 receiver.recv().map_err(Error::from)
230 }
231
232 fn follow(&mut self, id: NodeId, alias: Option<Alias>) -> Result<bool, Error> {
233 let (sender, receiver) = chan::bounded(1);
234 self.command(service::Command::Follow(id, alias, sender))?;
235 receiver.recv().map_err(Error::from)
236 }
237
238 fn unfollow(&mut self, id: NodeId) -> Result<bool, Error> {
239 let (sender, receiver) = chan::bounded(1);
240 self.command(service::Command::Unfollow(id, sender))?;
241 receiver.recv().map_err(Error::from)
242 }
243
244 fn seed(&mut self, id: RepoId, scope: policy::Scope) -> Result<bool, Error> {
245 let (sender, receiver) = chan::bounded(1);
246 self.command(service::Command::Seed(id, scope, sender))?;
247 receiver.recv().map_err(Error::from)
248 }
249
250 fn unseed(&mut self, id: RepoId) -> Result<bool, Error> {
251 let (sender, receiver) = chan::bounded(1);
252 self.command(service::Command::Unseed(id, sender))?;
253 receiver.recv().map_err(Error::from)
254 }
255
256 fn announce_refs(&mut self, id: RepoId) -> Result<RefsAt, Error> {
257 let (sender, receiver) = chan::bounded(1);
258 self.command(service::Command::AnnounceRefs(id, sender))?;
259 receiver.recv().map_err(Error::from)
260 }
261
262 fn announce_inventory(&mut self) -> Result<(), Error> {
263 self.command(service::Command::AnnounceInventory)
264 .map_err(Error::from)
265 }
266
267 fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
268 let (sender, receiver) = chan::bounded(1);
269 self.command(service::Command::AddInventory(rid, sender))?;
270 receiver.recv().map_err(Error::from)
271 }
272
273 fn subscribe(&self, _timeout: time::Duration) -> Result<Self::Events, Self::Error> {
274 Ok(self.events())
275 }
276
277 fn sessions(&self) -> Result<Self::Sessions, Error> {
278 let (sender, receiver) = chan::unbounded();
279 let query: Arc<QueryState> = Arc::new(move |state| {
280 let sessions = state
281 .sessions()
282 .values()
283 .map(radicle::node::Session::from)
284 .collect();
285 sender.send(sessions).ok();
286
287 Ok(())
288 });
289 let (err_sender, err_receiver) = chan::bounded(1);
290 self.command(service::Command::QueryState(query, err_sender))?;
291 err_receiver.recv()??;
292
293 let sessions = receiver.recv()?;
294
295 Ok(sessions)
296 }
297
298 fn session(&self, nid: NodeId) -> Result<Option<radicle::node::Session>, Self::Error> {
299 let (sender, receiver) = chan::bounded(1);
300 let query: Arc<QueryState> = Arc::new(move |state| {
301 let session = state.sessions().get(&nid).map(radicle::node::Session::from);
302 sender.send(session).ok();
303
304 Ok(())
305 });
306 let (err_sender, err_receiver) = chan::bounded(1);
307 self.command(service::Command::QueryState(query, err_sender))?;
308 err_receiver.recv()??;
309
310 let sessions = receiver.recv()?;
311
312 Ok(sessions)
313 }
314
315 fn shutdown(self) -> Result<(), Error> {
316 if self
318 .shutdown
319 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
320 .is_err()
321 {
322 return Ok(());
323 }
324 Stream::connect(self.home.socket())
328 .and_then(|sock| Command::Shutdown.to_writer(sock))
329 .ok();
330
331 self.controller
332 .shutdown()
333 .map_err(|_| Error::ChannelDisconnected)
334 }
335
336 fn debug(&self) -> Result<serde_json::Value, Self::Error> {
337 let (sender, receiver) = chan::bounded(1);
338 let query: Arc<QueryState> = Arc::new(move |state| {
339 let debug = serde_json::json!({
340 "outboxSize": state.outbox().len(),
341 "fetching": state.fetching().iter().map(|(rid, state)| {
342 json!({
343 "rid": rid,
344 "from": state.from,
345 "refsAt": state.refs_at,
346 "subscribers": state.subscribers.len(),
347 })
348 }).collect::<Vec<_>>(),
349 "queue": state.sessions().values().map(|sess| {
350 json!({
351 "nid": sess.id,
352 "queue": sess.queue.iter().map(|fetch| {
353 json!({
354 "rid": fetch.rid,
355 "from": fetch.from,
356 "refsAt": fetch.refs_at,
357 })
358 }).collect::<Vec<_>>()
359 })
360 }).collect::<Vec<_>>(),
361 "rateLimiter": state.limiter().buckets.iter().map(|(host, bucket)| {
362 json!({
363 "host": host.to_string(),
364 "bucket": bucket
365 })
366 }).collect::<Vec<_>>(),
367 "events": json!({
368 "subscribers": state.emitter().subscriptions(),
369 "pending": state.emitter().pending(),
370 }),
371 "metrics": state.metrics(),
372 });
373 sender.send(debug).ok();
374
375 Ok(())
376 });
377 let (err_sender, err_receiver) = chan::bounded(1);
378 self.command(service::Command::QueryState(query, err_sender))?;
379 err_receiver.recv()??;
380
381 let debug = receiver.recv()?;
382
383 Ok(debug)
384 }
385}