Skip to main content

radicle_ci_broker/
node_event_source.rs

1//! Read node events from the local node.
2
3use std::{
4    fmt,
5    path::{Path, PathBuf},
6    time,
7};
8
9use radicle::{
10    Profile,
11    node::{Event, Handle},
12};
13
14use crate::logger;
15
16/// Source of events from the local Radicle node.
17pub struct NodeEventSource {
18    profile_path: PathBuf,
19    events: Box<dyn Iterator<Item = Result<Event, radicle::node::Error>>>,
20}
21
22impl NodeEventSource {
23    /// Create a new source of node events, for a given Radicle
24    /// profile.
25    pub fn new(profile: &Profile) -> Result<Self, NodeEventError> {
26        let socket = profile.socket();
27        if !socket.exists() {
28            return Err(NodeEventError::NoControlSocket(socket));
29        }
30        let node = radicle::Node::new(socket.clone());
31        let source = match node.subscribe(time::Duration::MAX) {
32            Ok(events) => Ok(Self {
33                profile_path: profile.home.path().into(),
34                events: Box::new(events.into_iter()),
35            }),
36            Err(err) => {
37                logger::error("failed to subscribe to node events", &err);
38                Err(NodeEventError::cannot_subscribe(&socket, err))
39            }
40        }?;
41        logger::node_event_source_created(&source);
42        Ok(source)
43    }
44
45    /// Get the next node event from an event source, without
46    /// filtering. This will block until there is an event, or until
47    /// there will be no more events from this source, or there's an
48    /// error.
49    ///
50    /// A closed or broken connection to the node is not an error,
51    /// it's treated as end of file.
52    pub fn node_event(&mut self) -> Result<Option<Event>, NodeEventError> {
53        if let Some(event) = self.events.next() {
54            match event {
55                Ok(event) => {
56                    logger::node_event_source_got_event(&event);
57                    Ok(Some(event))
58                }
59                Err(radicle::node::Error::Io(err))
60                    if err.kind() == std::io::ErrorKind::ConnectionReset =>
61                {
62                    logger::event_disconnected();
63                    Ok(None)
64                }
65                Err(err) => {
66                    logger::error("error reading event from node", &err);
67                    Err(NodeEventError::node(err))
68                }
69            }
70        } else {
71            logger::node_event_source_eof(self);
72            Ok(None)
73        }
74    }
75}
76
77impl fmt::Debug for NodeEventSource {
78    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79        write!(f, "NodeEventSource<path={}>", self.profile_path.display())
80    }
81}
82
83/// Possible errors from accessing the local Radicle node.
84#[derive(Debug, thiserror::Error)]
85pub enum NodeEventError {
86    /// Node control socket does not exist.
87    #[error("node control socket does not exist: {0}")]
88    NoControlSocket(PathBuf),
89
90    /// Can't subscribe to node events.
91    #[error("failed to subscribe to node events on socket {0}")]
92    CannotSubscribe(
93        PathBuf,
94        #[source] Box<dyn std::error::Error + Send + 'static>,
95    ),
96
97    /// Some error from getting an event from the node.
98    #[error(transparent)]
99    Node(#[from] Box<dyn std::error::Error + Send + 'static>),
100
101    /// Connection to the node control socket broke.
102    #[error("connection to the node control socket has been lost: can't continue")]
103    BrokenConnection,
104}
105
106impl NodeEventError {
107    fn cannot_subscribe(path: &Path, err: radicle::node::Error) -> Self {
108        Self::CannotSubscribe(path.into(), Box::new(err))
109    }
110
111    fn node(err: radicle::node::Error) -> Self {
112        Self::Node(Box::new(err))
113    }
114}