radicle_ci_broker/
node_event_source.rs1use std::{
4 fmt,
5 path::{Path, PathBuf},
6 time,
7};
8
9use radicle::{
10 Profile,
11 node::{Event, Handle},
12};
13
14use crate::logger;
15
16pub struct NodeEventSource {
18 profile_path: PathBuf,
19 events: Box<dyn Iterator<Item = Result<Event, radicle::node::Error>>>,
20}
21
22impl NodeEventSource {
23 pub fn new(profile: &Profile) -> Result<Self, NodeEventError> {
26 let socket = profile.socket();
27 if !socket.exists() {
28 return Err(NodeEventError::NoControlSocket(socket));
29 }
30 let node = radicle::Node::new(socket.clone());
31 let source = match node.subscribe(time::Duration::MAX) {
32 Ok(events) => Ok(Self {
33 profile_path: profile.home.path().into(),
34 events: Box::new(events.into_iter()),
35 }),
36 Err(err) => {
37 logger::error("failed to subscribe to node events", &err);
38 Err(NodeEventError::cannot_subscribe(&socket, err))
39 }
40 }?;
41 logger::node_event_source_created(&source);
42 Ok(source)
43 }
44
45 pub fn node_event(&mut self) -> Result<Option<Event>, NodeEventError> {
53 if let Some(event) = self.events.next() {
54 match event {
55 Ok(event) => {
56 logger::node_event_source_got_event(&event);
57 Ok(Some(event))
58 }
59 Err(radicle::node::Error::Io(err))
60 if err.kind() == std::io::ErrorKind::ConnectionReset =>
61 {
62 logger::event_disconnected();
63 Ok(None)
64 }
65 Err(err) => {
66 logger::error("error reading event from node", &err);
67 Err(NodeEventError::node(err))
68 }
69 }
70 } else {
71 logger::node_event_source_eof(self);
72 Ok(None)
73 }
74 }
75}
76
77impl fmt::Debug for NodeEventSource {
78 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79 write!(f, "NodeEventSource<path={}>", self.profile_path.display())
80 }
81}
82
83#[derive(Debug, thiserror::Error)]
85pub enum NodeEventError {
86 #[error("node control socket does not exist: {0}")]
88 NoControlSocket(PathBuf),
89
90 #[error("failed to subscribe to node events on socket {0}")]
92 CannotSubscribe(
93 PathBuf,
94 #[source] Box<dyn std::error::Error + Send + 'static>,
95 ),
96
97 #[error(transparent)]
99 Node(#[from] Box<dyn std::error::Error + Send + 'static>),
100
101 #[error("connection to the node control socket has been lost: can't continue")]
103 BrokenConnection,
104}
105
106impl NodeEventError {
107 fn cannot_subscribe(path: &Path, err: radicle::node::Error) -> Self {
108 Self::CannotSubscribe(path.into(), Box::new(err))
109 }
110
111 fn node(err: radicle::node::Error) -> Self {
112 Self::Node(Box::new(err))
113 }
114}