p2panda_net/address_book/
api.rs1use std::collections::HashSet;
4use std::sync::Arc;
5
6use p2panda_core::Topic;
7use p2panda_store::{SqliteError, SqliteStore};
8use ractor::{ActorRef, call, cast};
9use thiserror::Error;
10use tokio::sync::RwLock;
11
12use crate::NodeId;
13use crate::address_book::Builder;
14use crate::address_book::actor::ToAddressBookActor;
15use crate::address_book::report::ConnectionOutcome;
16use crate::addrs::{NodeInfo, NodeInfoError, TransportInfo};
17use crate::watchers::{UpdatesOnly, WatcherReceiver};
18
19#[derive(Clone, Debug)]
65pub struct AddressBook {
66 pub(super) inner: Arc<RwLock<Inner>>,
67}
68
69#[derive(Debug)]
70pub(super) struct Inner {
71 pub(super) actor_ref: Option<ActorRef<ToAddressBookActor>>,
72}
73
74impl AddressBook {
75 pub(crate) fn new(actor_ref: Option<ActorRef<ToAddressBookActor>>) -> Self {
76 Self {
77 inner: Arc::new(RwLock::new(Inner { actor_ref })),
78 }
79 }
80
81 pub fn builder() -> Builder {
82 Builder::new()
83 }
84
85 pub async fn node_info(&self, node_id: NodeId) -> Result<Option<NodeInfo>, AddressBookError> {
89 let inner = self.inner.read().await;
90 let result = call!(
91 inner.actor_ref.as_ref().expect("actor spawned in builder"),
92 ToAddressBookActor::NodeInfo,
93 node_id
94 )
95 .map_err(Box::new)?;
96 Ok(result)
97 }
98
99 pub async fn insert_node_info(&self, node_info: NodeInfo) -> Result<bool, AddressBookError> {
108 let inner = self.inner.read().await;
109 let result = call!(
110 inner.actor_ref.as_ref().expect("actor spawned in builder"),
111 ToAddressBookActor::InsertNodeInfo,
112 node_info
113 )
114 .map_err(Box::new)??;
115 Ok(result)
116 }
117
118 pub async fn insert_transport_info(
131 &self,
132 node_id: NodeId,
133 transport_info: TransportInfo,
134 ) -> Result<bool, AddressBookError> {
135 let inner = self.inner.read().await;
136 let result = call!(
137 inner.actor_ref.as_ref().expect("actor spawned in builder"),
138 ToAddressBookActor::InsertTransportInfo,
139 node_id,
140 transport_info
141 )
142 .map_err(Box::new)??;
143 Ok(result)
144 }
145
146 pub async fn node_infos_by_topics(
147 &self,
148 topics: impl IntoIterator<Item = Topic>,
149 ) -> Result<Vec<NodeInfo>, AddressBookError> {
150 let inner = self.inner.read().await;
151 let result = call!(
152 inner.actor_ref.as_ref().expect("actor spawned in builder"),
153 ToAddressBookActor::NodeInfosByTopics,
154 topics.into_iter().collect()
155 )
156 .map_err(Box::new)?;
157 Ok(result)
158 }
159
160 pub async fn set_topics(
161 &self,
162 node_id: NodeId,
163 topics: impl IntoIterator<Item = Topic>,
164 ) -> Result<(), AddressBookError> {
165 let inner = self.inner.read().await;
166 cast!(
167 inner.actor_ref.as_ref().expect("actor spawned in builder"),
168 ToAddressBookActor::SetTopics(node_id, topics.into_iter().collect())
169 )
170 .map_err(Box::new)?;
171 Ok(())
172 }
173
174 pub async fn add_topic(&self, node_id: NodeId, topic: Topic) -> Result<(), AddressBookError> {
175 let inner = self.inner.read().await;
176 cast!(
177 inner.actor_ref.as_ref().expect("actor spawned in builder"),
178 ToAddressBookActor::AddTopic(node_id, topic)
179 )
180 .map_err(Box::new)?;
181 Ok(())
182 }
183
184 pub async fn remove_topic(
185 &self,
186 node_id: NodeId,
187 topic: Topic,
188 ) -> Result<(), AddressBookError> {
189 let inner = self.inner.read().await;
190 cast!(
191 inner.actor_ref.as_ref().expect("actor spawned in builder"),
192 ToAddressBookActor::RemoveTopic(node_id, topic)
193 )
194 .map_err(Box::new)?;
195 Ok(())
196 }
197
198 pub async fn watch_node_info(
200 &self,
201 node_id: NodeId,
202 updates_only: UpdatesOnly,
203 ) -> Result<WatcherReceiver<Option<NodeInfo>>, AddressBookError> {
204 let inner = self.inner.read().await;
205 let result = call!(
206 inner.actor_ref.as_ref().expect("actor spawned in builder"),
207 ToAddressBookActor::WatchNodeInfo,
208 node_id,
209 updates_only
210 )
211 .map_err(Box::new)?;
212 Ok(result)
213 }
214
215 pub async fn watch_topic(
217 &self,
218 topic_id: Topic,
219 updates_only: UpdatesOnly,
220 ) -> Result<WatcherReceiver<HashSet<NodeId>>, AddressBookError> {
221 let inner = self.inner.read().await;
222 let result = call!(
223 inner.actor_ref.as_ref().expect("actor spawned in builder"),
224 ToAddressBookActor::WatchTopic,
225 topic_id,
226 updates_only
227 )
228 .map_err(Box::new)?;
229 Ok(result)
230 }
231
232 pub async fn watch_node_topics(
234 &self,
235 node_id: NodeId,
236 updates_only: UpdatesOnly,
237 ) -> Result<WatcherReceiver<HashSet<Topic>>, AddressBookError> {
238 let inner = self.inner.read().await;
239 let result = call!(
240 inner.actor_ref.as_ref().expect("actor spawned in builder"),
241 ToAddressBookActor::WatchNodeTopics,
242 node_id,
243 updates_only
244 )
245 .map_err(Box::new)?;
246 Ok(result)
247 }
248
249 pub async fn report(
253 &self,
254 node_id: NodeId,
255 connection_outcome: ConnectionOutcome,
256 ) -> Result<(), AddressBookError> {
257 let inner = self.inner.read().await;
258 cast!(
259 inner.actor_ref.as_ref().expect("actor spawned in builder"),
260 ToAddressBookActor::Report(node_id, connection_outcome)
261 )
262 .map_err(Box::new)?;
263 Ok(())
264 }
265
266 pub(crate) async fn store(&self) -> Result<SqliteStore, AddressBookError> {
267 let inner = self.inner.read().await;
268 let result = call!(
269 inner.actor_ref.as_ref().expect("actor spawned in builder"),
270 ToAddressBookActor::Store
271 )
272 .map_err(Box::new)?;
273 Ok(result)
274 }
275}
276
277impl Drop for Inner {
278 fn drop(&mut self) {
279 if let Some(actor_ref) = self.actor_ref.take() {
280 actor_ref.stop(None);
281 }
282 }
283}
284
285#[derive(Debug, Error)]
286pub enum AddressBookError {
287 #[error(transparent)]
289 ActorSpawn(#[from] ractor::SpawnErr),
290
291 #[cfg(feature = "supervisor")]
293 #[error(transparent)]
294 ActorLinkedSpawn(#[from] crate::supervisor::SupervisorError),
295
296 #[error(transparent)]
298 ActorRpc(#[from] Box<ractor::RactorErr<ToAddressBookActor>>),
299
300 #[error(transparent)]
302 Store(#[from] SqliteError),
303
304 #[error(transparent)]
306 NodeInfo(#[from] NodeInfoError),
307}