1use std::net;
2use std::os::unix::net::UnixStream;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::{fmt, io, time};
6
7use crossbeam_channel as chan;
8use radicle::node::{ConnectOptions, ConnectResult, Link, Seeds};
9use radicle::storage::refs::RefsAt;
10use reactor::poller::popol::PopolWaker;
11use serde_json::json;
12use thiserror::Error;
13
14use crate::identity::RepoId;
15use crate::node::{Alias, Command, FetchResult};
16use crate::profile::Home;
17use crate::runtime::Emitter;
18use crate::service;
19use crate::service::policy;
20use crate::service::NodeId;
21use crate::service::{CommandError, Config, QueryState};
22use crate::service::{Event, Events};
23use crate::wire;
24use crate::wire::StreamId;
25use crate::worker::TaskResult;
26
27#[derive(Error, Debug)]
29pub enum Error {
30 #[error("command channel is not connected")]
32 ChannelDisconnected,
33 #[error("command failed: {0}")]
35 Command(#[from] CommandError),
36 #[error("the operation timed out")]
38 Timeout,
39 #[error(transparent)]
41 Io(#[from] std::io::Error),
42}
43
44impl From<chan::RecvError> for Error {
45 fn from(_: chan::RecvError) -> Self {
46 Self::ChannelDisconnected
47 }
48}
49
50impl From<chan::RecvTimeoutError> for Error {
51 fn from(err: chan::RecvTimeoutError) -> Self {
52 match err {
53 chan::RecvTimeoutError::Timeout => Self::Timeout,
54 chan::RecvTimeoutError::Disconnected => Self::ChannelDisconnected,
55 }
56 }
57}
58
59impl<T> From<chan::SendError<T>> for Error {
60 fn from(_: chan::SendError<T>) -> Self {
61 Self::ChannelDisconnected
62 }
63}
64
65pub struct Handle {
66 pub(crate) home: Home,
67 pub(crate) controller: reactor::Controller<wire::Control, PopolWaker>,
68
69 shutdown: Arc<AtomicBool>,
71 emitter: Emitter<Event>,
73}
74
75impl Handle {
76 pub fn events(&self) -> Events {
78 Events::from(self.emitter.subscribe())
79 }
80}
81
82impl fmt::Debug for Handle {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 f.debug_struct("Handle").field("home", &self.home).finish()
85 }
86}
87
88impl Clone for Handle {
89 fn clone(&self) -> Self {
90 Self {
91 home: self.home.clone(),
92 controller: self.controller.clone(),
93 shutdown: self.shutdown.clone(),
94 emitter: self.emitter.clone(),
95 }
96 }
97}
98
99impl Handle {
100 pub fn new(
101 home: Home,
102 controller: reactor::Controller<wire::Control, PopolWaker>,
103 emitter: Emitter<Event>,
104 ) -> Self {
105 Self {
106 home,
107 controller,
108 shutdown: Arc::default(),
109 emitter,
110 }
111 }
112
113 pub fn worker_result(&mut self, result: TaskResult) -> Result<(), io::Error> {
114 self.controller.cmd(wire::Control::Worker(result))
115 }
116
117 pub fn flush(&mut self, remote: NodeId, stream: StreamId) -> Result<(), io::Error> {
118 self.controller.cmd(wire::Control::Flush { remote, stream })
119 }
120
121 pub(crate) fn command(&self, cmd: service::Command) -> Result<(), io::Error> {
122 self.controller.cmd(wire::Control::User(cmd))
123 }
124}
125
126impl radicle::node::Handle for Handle {
127 type Sessions = Vec<radicle::node::Session>;
128 type Events = Events;
129 type Event = Event;
130 type Error = Error;
131
132 fn nid(&self) -> Result<NodeId, Self::Error> {
133 let (sender, receiver) = chan::bounded(1);
134 let query: Arc<QueryState> = Arc::new(move |state| {
135 sender.send(*state.nid()).ok();
136 Ok(())
137 });
138 let (err_sender, err_receiver) = chan::bounded(1);
139 self.command(service::Command::QueryState(query, err_sender))?;
140 err_receiver.recv()??;
141
142 let nid = receiver.recv()?;
143
144 Ok(nid)
145 }
146
147 fn is_running(&self) -> bool {
148 true
149 }
150
151 fn connect(
152 &mut self,
153 node: NodeId,
154 addr: radicle::node::Address,
155 opts: ConnectOptions,
156 ) -> Result<ConnectResult, Error> {
157 let events = self.events();
158 let timeout = opts.timeout;
159 let sessions = self.sessions()?;
160 let session = sessions.iter().find(|s| s.nid == node);
161
162 if let Some(s) = session {
163 if s.state.is_connected() {
164 return Ok(ConnectResult::Connected);
165 }
166 }
167 self.command(service::Command::Connect(node, addr, opts))?;
168
169 events
170 .wait(
171 |e| match e {
172 Event::PeerConnected { nid } if nid == &node => Some(ConnectResult::Connected),
173 Event::PeerDisconnected { nid, reason } if nid == &node => {
174 Some(ConnectResult::Disconnected {
175 reason: reason.clone(),
176 })
177 }
178 _ => None,
179 },
180 timeout,
181 )
182 .map_err(Error::from)
183 }
184
185 fn disconnect(&mut self, node: NodeId) -> Result<(), Self::Error> {
186 let events = self.events();
187 self.command(service::Command::Disconnect(node))?;
188 events
189 .wait(
190 |e| match e {
191 Event::PeerDisconnected { nid, .. } if nid == &node => Some(()),
192 _ => None,
193 },
194 time::Duration::MAX,
195 )
196 .map_err(Error::from)
197 }
198
199 fn seeds(&mut self, id: RepoId) -> Result<Seeds, Self::Error> {
200 let (sender, receiver) = chan::bounded(1);
201 self.command(service::Command::Seeds(id, sender))?;
202 receiver.recv().map_err(Error::from)
203 }
204
205 fn config(&self) -> Result<Config, Self::Error> {
206 let (sender, receiver) = chan::bounded(1);
207 self.command(service::Command::Config(sender))?;
208 receiver.recv().map_err(Error::from)
209 }
210
211 fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Self::Error> {
212 let (sender, receiver) = chan::bounded(1);
213 self.command(service::Command::ListenAddrs(sender))?;
214 receiver.recv().map_err(Error::from)
215 }
216
217 fn fetch(
218 &mut self,
219 id: RepoId,
220 from: NodeId,
221 timeout: time::Duration,
222 ) -> Result<FetchResult, Error> {
223 let (sender, receiver) = chan::bounded(1);
224 self.command(service::Command::Fetch(id, from, timeout, sender))?;
225 receiver.recv().map_err(Error::from)
226 }
227
228 fn follow(&mut self, id: NodeId, alias: Option<Alias>) -> Result<bool, Error> {
229 let (sender, receiver) = chan::bounded(1);
230 self.command(service::Command::Follow(id, alias, sender))?;
231 receiver.recv().map_err(Error::from)
232 }
233
234 fn unfollow(&mut self, id: NodeId) -> Result<bool, Error> {
235 let (sender, receiver) = chan::bounded(1);
236 self.command(service::Command::Unfollow(id, sender))?;
237 receiver.recv().map_err(Error::from)
238 }
239
240 fn seed(&mut self, id: RepoId, scope: policy::Scope) -> Result<bool, Error> {
241 let (sender, receiver) = chan::bounded(1);
242 self.command(service::Command::Seed(id, scope, sender))?;
243 receiver.recv().map_err(Error::from)
244 }
245
246 fn unseed(&mut self, id: RepoId) -> Result<bool, Error> {
247 let (sender, receiver) = chan::bounded(1);
248 self.command(service::Command::Unseed(id, sender))?;
249 receiver.recv().map_err(Error::from)
250 }
251
252 fn announce_refs(&mut self, id: RepoId) -> Result<RefsAt, Error> {
253 let (sender, receiver) = chan::bounded(1);
254 self.command(service::Command::AnnounceRefs(id, sender))?;
255 receiver.recv().map_err(Error::from)
256 }
257
258 fn announce_inventory(&mut self) -> Result<(), Error> {
259 self.command(service::Command::AnnounceInventory)
260 .map_err(Error::from)
261 }
262
263 fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
264 let (sender, receiver) = chan::bounded(1);
265 self.command(service::Command::AddInventory(rid, sender))?;
266 receiver.recv().map_err(Error::from)
267 }
268
269 fn subscribe(&self, _timeout: time::Duration) -> Result<Self::Events, Self::Error> {
270 Ok(self.events())
271 }
272
273 fn sessions(&self) -> Result<Self::Sessions, Error> {
274 let (sender, receiver) = chan::unbounded();
275 let query: Arc<QueryState> = Arc::new(move |state| {
276 let sessions = state
277 .sessions()
278 .iter()
279 .map(|(nid, s)| radicle::node::Session {
280 nid: *nid,
281 link: if s.link.is_inbound() {
282 Link::Inbound
283 } else {
284 Link::Outbound
285 },
286 addr: s.addr.clone(),
287 state: s.state.clone(),
288 })
289 .collect();
290 sender.send(sessions).ok();
291
292 Ok(())
293 });
294 let (err_sender, err_receiver) = chan::bounded(1);
295 self.command(service::Command::QueryState(query, err_sender))?;
296 err_receiver.recv()??;
297
298 let sessions = receiver.recv()?;
299
300 Ok(sessions)
301 }
302
303 fn shutdown(self) -> Result<(), Error> {
304 if self
306 .shutdown
307 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
308 .is_err()
309 {
310 return Ok(());
311 }
312 UnixStream::connect(self.home.socket())
316 .and_then(|sock| Command::Shutdown.to_writer(sock))
317 .ok();
318
319 self.controller
320 .shutdown()
321 .map_err(|_| Error::ChannelDisconnected)
322 }
323
324 fn debug(&self) -> Result<serde_json::Value, Self::Error> {
325 let (sender, receiver) = chan::bounded(1);
326 let query: Arc<QueryState> = Arc::new(move |state| {
327 let debug = serde_json::json!({
328 "outboxSize": state.outbox().len(),
329 "fetching": state.fetching().iter().map(|(rid, state)| {
330 json!({
331 "rid": rid,
332 "from": state.from,
333 "refsAt": state.refs_at,
334 "subscribers": state.subscribers.len(),
335 })
336 }).collect::<Vec<_>>(),
337 "queue": state.sessions().values().map(|sess| {
338 json!({
339 "nid": sess.id,
340 "queue": sess.queue.iter().map(|fetch| {
341 json!({
342 "rid": fetch.rid,
343 "from": fetch.from,
344 "refsAt": fetch.refs_at,
345 })
346 }).collect::<Vec<_>>()
347 })
348 }).collect::<Vec<_>>(),
349 "rateLimiter": state.limiter().buckets.iter().map(|(host, bucket)| {
350 json!({
351 "host": host.to_string(),
352 "bucket": bucket
353 })
354 }).collect::<Vec<_>>(),
355 "events": json!({
356 "subscribers": state.emitter().subscriptions(),
357 "pending": state.emitter().pending(),
358 }),
359 "metrics": state.metrics(),
360 });
361 sender.send(debug).ok();
362
363 Ok(())
364 });
365 let (err_sender, err_receiver) = chan::bounded(1);
366 self.command(service::Command::QueryState(query, err_sender))?;
367 err_receiver.recv()??;
368
369 let debug = receiver.recv()?;
370
371 Ok(debug)
372 }
373}