use std::collections::HashSet;
use std::sync::Arc;
use p2panda_core::Topic;
use p2panda_store::{SqliteError, SqliteStore};
use ractor::{ActorRef, call, cast};
use thiserror::Error;
use tokio::sync::RwLock;
use crate::NodeId;
use crate::address_book::Builder;
use crate::address_book::actor::ToAddressBookActor;
use crate::address_book::report::ConnectionOutcome;
use crate::addrs::{NodeInfo, NodeInfoError, TransportInfo};
use crate::watchers::{UpdatesOnly, WatcherReceiver};
#[derive(Clone, Debug)]
pub struct AddressBook {
pub(super) inner: Arc<RwLock<Inner>>,
}
#[derive(Debug)]
pub(super) struct Inner {
pub(super) actor_ref: Option<ActorRef<ToAddressBookActor>>,
}
impl AddressBook {
pub(crate) fn new(actor_ref: Option<ActorRef<ToAddressBookActor>>) -> Self {
Self {
inner: Arc::new(RwLock::new(Inner { actor_ref })),
}
}
pub fn builder() -> Builder {
Builder::new()
}
pub async fn node_info(&self, node_id: NodeId) -> Result<Option<NodeInfo>, AddressBookError> {
let inner = self.inner.read().await;
let result = call!(
inner.actor_ref.as_ref().expect("actor spawned in builder"),
ToAddressBookActor::NodeInfo,
node_id
)
.map_err(Box::new)?;
Ok(result)
}
pub async fn insert_node_info(&self, node_info: NodeInfo) -> Result<bool, AddressBookError> {
let inner = self.inner.read().await;
let result = call!(
inner.actor_ref.as_ref().expect("actor spawned in builder"),
ToAddressBookActor::InsertNodeInfo,
node_info
)
.map_err(Box::new)??;
Ok(result)
}
pub async fn insert_transport_info(
&self,
node_id: NodeId,
transport_info: TransportInfo,
) -> Result<bool, AddressBookError> {
let inner = self.inner.read().await;
let result = call!(
inner.actor_ref.as_ref().expect("actor spawned in builder"),
ToAddressBookActor::InsertTransportInfo,
node_id,
transport_info
)
.map_err(Box::new)??;
Ok(result)
}
pub async fn node_infos_by_topics(
&self,
topics: impl IntoIterator<Item = Topic>,
) -> Result<Vec<NodeInfo>, AddressBookError> {
let inner = self.inner.read().await;
let result = call!(
inner.actor_ref.as_ref().expect("actor spawned in builder"),
ToAddressBookActor::NodeInfosByTopics,
topics.into_iter().collect()
)
.map_err(Box::new)?;
Ok(result)
}
pub async fn set_topics(
&self,
node_id: NodeId,
topics: impl IntoIterator<Item = Topic>,
) -> Result<(), AddressBookError> {
let inner = self.inner.read().await;
cast!(
inner.actor_ref.as_ref().expect("actor spawned in builder"),
ToAddressBookActor::SetTopics(node_id, topics.into_iter().collect())
)
.map_err(Box::new)?;
Ok(())
}
pub async fn add_topic(&self, node_id: NodeId, topic: Topic) -> Result<(), AddressBookError> {
let inner = self.inner.read().await;
cast!(
inner.actor_ref.as_ref().expect("actor spawned in builder"),
ToAddressBookActor::AddTopic(node_id, topic)
)
.map_err(Box::new)?;
Ok(())
}
pub async fn remove_topic(
&self,
node_id: NodeId,
topic: Topic,
) -> Result<(), AddressBookError> {
let inner = self.inner.read().await;
cast!(
inner.actor_ref.as_ref().expect("actor spawned in builder"),
ToAddressBookActor::RemoveTopic(node_id, topic)
)
.map_err(Box::new)?;
Ok(())
}
pub async fn watch_node_info(
&self,
node_id: NodeId,
updates_only: UpdatesOnly,
) -> Result<WatcherReceiver<Option<NodeInfo>>, AddressBookError> {
let inner = self.inner.read().await;
let result = call!(
inner.actor_ref.as_ref().expect("actor spawned in builder"),
ToAddressBookActor::WatchNodeInfo,
node_id,
updates_only
)
.map_err(Box::new)?;
Ok(result)
}
pub async fn watch_topic(
&self,
topic_id: Topic,
updates_only: UpdatesOnly,
) -> Result<WatcherReceiver<HashSet<NodeId>>, AddressBookError> {
let inner = self.inner.read().await;
let result = call!(
inner.actor_ref.as_ref().expect("actor spawned in builder"),
ToAddressBookActor::WatchTopic,
topic_id,
updates_only
)
.map_err(Box::new)?;
Ok(result)
}
pub async fn watch_node_topics(
&self,
node_id: NodeId,
updates_only: UpdatesOnly,
) -> Result<WatcherReceiver<HashSet<Topic>>, AddressBookError> {
let inner = self.inner.read().await;
let result = call!(
inner.actor_ref.as_ref().expect("actor spawned in builder"),
ToAddressBookActor::WatchNodeTopics,
node_id,
updates_only
)
.map_err(Box::new)?;
Ok(result)
}
pub async fn report(
&self,
node_id: NodeId,
connection_outcome: ConnectionOutcome,
) -> Result<(), AddressBookError> {
let inner = self.inner.read().await;
cast!(
inner.actor_ref.as_ref().expect("actor spawned in builder"),
ToAddressBookActor::Report(node_id, connection_outcome)
)
.map_err(Box::new)?;
Ok(())
}
pub(crate) async fn store(&self) -> Result<SqliteStore, AddressBookError> {
let inner = self.inner.read().await;
let result = call!(
inner.actor_ref.as_ref().expect("actor spawned in builder"),
ToAddressBookActor::Store
)
.map_err(Box::new)?;
Ok(result)
}
}
impl Drop for Inner {
fn drop(&mut self) {
if let Some(actor_ref) = self.actor_ref.take() {
actor_ref.stop(None);
}
}
}
#[derive(Debug, Error)]
pub enum AddressBookError {
#[error(transparent)]
ActorSpawn(#[from] ractor::SpawnErr),
#[cfg(feature = "supervisor")]
#[error(transparent)]
ActorLinkedSpawn(#[from] crate::supervisor::SupervisorError),
#[error(transparent)]
ActorRpc(#[from] Box<ractor::RactorErr<ToAddressBookActor>>),
#[error(transparent)]
Store(#[from] SqliteError),
#[error(transparent)]
NodeInfo(#[from] NodeInfoError),
}