1use std::net;
2use std::os::unix::net::UnixStream;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::{fmt, io, time};
6
7use crossbeam_channel as chan;
8use radicle::node::{ConnectOptions, ConnectResult, Seeds};
9use radicle::storage::refs::RefsAt;
10use reactor::poller::popol::PopolWaker;
11use serde_json::json;
12use thiserror::Error;
13
14use crate::identity::RepoId;
15use crate::node::{Alias, Command, FetchResult};
16use crate::profile::Home;
17use crate::runtime::Emitter;
18use crate::service;
19use crate::service::policy;
20use crate::service::NodeId;
21use crate::service::{CommandError, Config, QueryState};
22use crate::service::{Event, Events};
23use crate::wire;
24use crate::wire::StreamId;
25use crate::worker::TaskResult;
26
27#[derive(Error, Debug)]
29pub enum Error {
30 #[error("command channel is not connected")]
32 ChannelDisconnected,
33 #[error("command failed: {0}")]
35 Command(#[from] CommandError),
36 #[error("the operation timed out")]
38 Timeout,
39 #[error(transparent)]
41 Io(#[from] std::io::Error),
42}
43
44impl From<chan::RecvError> for Error {
45 fn from(_: chan::RecvError) -> Self {
46 Self::ChannelDisconnected
47 }
48}
49
50impl From<chan::RecvTimeoutError> for Error {
51 fn from(err: chan::RecvTimeoutError) -> Self {
52 match err {
53 chan::RecvTimeoutError::Timeout => Self::Timeout,
54 chan::RecvTimeoutError::Disconnected => Self::ChannelDisconnected,
55 }
56 }
57}
58
59impl<T> From<chan::SendError<T>> for Error {
60 fn from(_: chan::SendError<T>) -> Self {
61 Self::ChannelDisconnected
62 }
63}
64
65pub struct Handle {
66 pub(crate) home: Home,
67 pub(crate) controller: reactor::Controller<wire::Control, PopolWaker>,
68
69 shutdown: Arc<AtomicBool>,
71 emitter: Emitter<Event>,
73}
74
75impl Handle {
76 pub fn events(&self) -> Events {
78 Events::from(self.emitter.subscribe())
79 }
80}
81
82impl fmt::Debug for Handle {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 f.debug_struct("Handle").field("home", &self.home).finish()
85 }
86}
87
88impl Clone for Handle {
89 fn clone(&self) -> Self {
90 Self {
91 home: self.home.clone(),
92 controller: self.controller.clone(),
93 shutdown: self.shutdown.clone(),
94 emitter: self.emitter.clone(),
95 }
96 }
97}
98
99impl Handle {
100 pub fn new(
101 home: Home,
102 controller: reactor::Controller<wire::Control, PopolWaker>,
103 emitter: Emitter<Event>,
104 ) -> Self {
105 Self {
106 home,
107 controller,
108 shutdown: Arc::default(),
109 emitter,
110 }
111 }
112
113 pub fn worker_result(&mut self, result: TaskResult) -> Result<(), io::Error> {
114 self.controller.cmd(wire::Control::Worker(result))
115 }
116
117 pub fn flush(&mut self, remote: NodeId, stream: StreamId) -> Result<(), io::Error> {
118 self.controller.cmd(wire::Control::Flush { remote, stream })
119 }
120
121 pub(crate) fn command(&self, cmd: service::Command) -> Result<(), io::Error> {
122 self.controller.cmd(wire::Control::User(cmd))
123 }
124}
125
126impl radicle::node::Handle for Handle {
127 type Sessions = Vec<radicle::node::Session>;
128 type Events = Events;
129 type Event = Event;
130 type Error = Error;
131
132 fn nid(&self) -> Result<NodeId, Self::Error> {
133 let (sender, receiver) = chan::bounded(1);
134 let query: Arc<QueryState> = Arc::new(move |state| {
135 sender.send(*state.nid()).ok();
136 Ok(())
137 });
138 let (err_sender, err_receiver) = chan::bounded(1);
139 self.command(service::Command::QueryState(query, err_sender))?;
140 err_receiver.recv()??;
141
142 let nid = receiver.recv()?;
143
144 Ok(nid)
145 }
146
147 fn is_running(&self) -> bool {
148 true
149 }
150
151 fn connect(
152 &mut self,
153 node: NodeId,
154 addr: radicle::node::Address,
155 opts: ConnectOptions,
156 ) -> Result<ConnectResult, Error> {
157 let events = self.events();
158 let timeout = opts.timeout;
159 let sessions = self.sessions()?;
160 let session = sessions.iter().find(|s| s.nid == node);
161
162 if let Some(s) = session {
163 if s.state.is_connected() {
164 return Ok(ConnectResult::Connected);
165 }
166 }
167 self.command(service::Command::Connect(node, addr, opts))?;
168
169 events
170 .wait(
171 |e| match e {
172 Event::PeerConnected { nid } if nid == &node => Some(ConnectResult::Connected),
173 Event::PeerDisconnected { nid, reason } if nid == &node => {
174 Some(ConnectResult::Disconnected {
175 reason: reason.clone(),
176 })
177 }
178 _ => None,
179 },
180 timeout,
181 )
182 .map_err(Error::from)
183 }
184
185 fn disconnect(&mut self, node: NodeId) -> Result<(), Self::Error> {
186 let events = self.events();
187 self.command(service::Command::Disconnect(node))?;
188 events
189 .wait(
190 |e| match e {
191 Event::PeerDisconnected { nid, .. } if nid == &node => Some(()),
192 _ => None,
193 },
194 time::Duration::MAX,
195 )
196 .map_err(Error::from)
197 }
198
199 fn seeds(&mut self, id: RepoId) -> Result<Seeds, Self::Error> {
200 let (sender, receiver) = chan::bounded(1);
201 self.command(service::Command::Seeds(id, sender))?;
202 receiver.recv().map_err(Error::from)
203 }
204
205 fn config(&self) -> Result<Config, Self::Error> {
206 let (sender, receiver) = chan::bounded(1);
207 self.command(service::Command::Config(sender))?;
208 receiver.recv().map_err(Error::from)
209 }
210
211 fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Self::Error> {
212 let (sender, receiver) = chan::bounded(1);
213 self.command(service::Command::ListenAddrs(sender))?;
214 receiver.recv().map_err(Error::from)
215 }
216
217 fn fetch(
218 &mut self,
219 id: RepoId,
220 from: NodeId,
221 timeout: time::Duration,
222 ) -> Result<FetchResult, Error> {
223 let (sender, receiver) = chan::bounded(1);
224 self.command(service::Command::Fetch(id, from, timeout, sender))?;
225 receiver.recv().map_err(Error::from)
226 }
227
228 fn follow(&mut self, id: NodeId, alias: Option<Alias>) -> Result<bool, Error> {
229 let (sender, receiver) = chan::bounded(1);
230 self.command(service::Command::Follow(id, alias, sender))?;
231 receiver.recv().map_err(Error::from)
232 }
233
234 fn unfollow(&mut self, id: NodeId) -> Result<bool, Error> {
235 let (sender, receiver) = chan::bounded(1);
236 self.command(service::Command::Unfollow(id, sender))?;
237 receiver.recv().map_err(Error::from)
238 }
239
240 fn seed(&mut self, id: RepoId, scope: policy::Scope) -> Result<bool, Error> {
241 let (sender, receiver) = chan::bounded(1);
242 self.command(service::Command::Seed(id, scope, sender))?;
243 receiver.recv().map_err(Error::from)
244 }
245
246 fn unseed(&mut self, id: RepoId) -> Result<bool, Error> {
247 let (sender, receiver) = chan::bounded(1);
248 self.command(service::Command::Unseed(id, sender))?;
249 receiver.recv().map_err(Error::from)
250 }
251
252 fn announce_refs(&mut self, id: RepoId) -> Result<RefsAt, Error> {
253 let (sender, receiver) = chan::bounded(1);
254 self.command(service::Command::AnnounceRefs(id, sender))?;
255 receiver.recv().map_err(Error::from)
256 }
257
258 fn announce_inventory(&mut self) -> Result<(), Error> {
259 self.command(service::Command::AnnounceInventory)
260 .map_err(Error::from)
261 }
262
263 fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
264 let (sender, receiver) = chan::bounded(1);
265 self.command(service::Command::AddInventory(rid, sender))?;
266 receiver.recv().map_err(Error::from)
267 }
268
269 fn subscribe(&self, _timeout: time::Duration) -> Result<Self::Events, Self::Error> {
270 Ok(self.events())
271 }
272
273 fn sessions(&self) -> Result<Self::Sessions, Error> {
274 let (sender, receiver) = chan::unbounded();
275 let query: Arc<QueryState> = Arc::new(move |state| {
276 let sessions = state
277 .sessions()
278 .iter()
279 .map(|(_, s)| radicle::node::Session::from(s))
280 .collect();
281 sender.send(sessions).ok();
282
283 Ok(())
284 });
285 let (err_sender, err_receiver) = chan::bounded(1);
286 self.command(service::Command::QueryState(query, err_sender))?;
287 err_receiver.recv()??;
288
289 let sessions = receiver.recv()?;
290
291 Ok(sessions)
292 }
293
294 fn session(&self, nid: NodeId) -> Result<Option<radicle::node::Session>, Self::Error> {
295 let (sender, receiver) = chan::bounded(1);
296 let query: Arc<QueryState> = Arc::new(move |state| {
297 let session = state.sessions().get(&nid).map(radicle::node::Session::from);
298 sender.send(session).ok();
299
300 Ok(())
301 });
302 let (err_sender, err_receiver) = chan::bounded(1);
303 self.command(service::Command::QueryState(query, err_sender))?;
304 err_receiver.recv()??;
305
306 let sessions = receiver.recv()?;
307
308 Ok(sessions)
309 }
310
311 fn shutdown(self) -> Result<(), Error> {
312 if self
314 .shutdown
315 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
316 .is_err()
317 {
318 return Ok(());
319 }
320 UnixStream::connect(self.home.socket())
324 .and_then(|sock| Command::Shutdown.to_writer(sock))
325 .ok();
326
327 self.controller
328 .shutdown()
329 .map_err(|_| Error::ChannelDisconnected)
330 }
331
332 fn debug(&self) -> Result<serde_json::Value, Self::Error> {
333 let (sender, receiver) = chan::bounded(1);
334 let query: Arc<QueryState> = Arc::new(move |state| {
335 let debug = serde_json::json!({
336 "outboxSize": state.outbox().len(),
337 "fetching": state.fetching().iter().map(|(rid, state)| {
338 json!({
339 "rid": rid,
340 "from": state.from,
341 "refsAt": state.refs_at,
342 "subscribers": state.subscribers.len(),
343 })
344 }).collect::<Vec<_>>(),
345 "queue": state.sessions().values().map(|sess| {
346 json!({
347 "nid": sess.id,
348 "queue": sess.queue.iter().map(|fetch| {
349 json!({
350 "rid": fetch.rid,
351 "from": fetch.from,
352 "refsAt": fetch.refs_at,
353 })
354 }).collect::<Vec<_>>()
355 })
356 }).collect::<Vec<_>>(),
357 "rateLimiter": state.limiter().buckets.iter().map(|(host, bucket)| {
358 json!({
359 "host": host.to_string(),
360 "bucket": bucket
361 })
362 }).collect::<Vec<_>>(),
363 "events": json!({
364 "subscribers": state.emitter().subscriptions(),
365 "pending": state.emitter().pending(),
366 }),
367 "metrics": state.metrics(),
368 });
369 sender.send(debug).ok();
370
371 Ok(())
372 });
373 let (err_sender, err_receiver) = chan::bounded(1);
374 self.command(service::Command::QueryState(query, err_sender))?;
375 err_receiver.recv()??;
376
377 let debug = receiver.recv()?;
378
379 Ok(debug)
380 }
381}