ankurah_core/
connector.rs

1use ankurah_proto as proto;
2use async_trait::async_trait;
3
4use crate::{node::NodeInner, policy::PolicyAgent, storage::StorageEngine, Node};
5
6// TODO redesign this such that:
7// - the sender and receiver are disconnected at the same time
8// - a connection id or dyn Ord/Eq/Hash is used to identify the connection for deregistration
9//   so that we can have multiple connections to the same node without things getting mixed up
10
11#[async_trait]
12pub trait PeerSender: Send + Sync {
13    async fn send_message(&self, message: proto::NodeMessage) -> Result<(), SendError>;
14    /// The node ID of the recipient of this message
15    fn recipient_node_id(&self) -> proto::NodeId;
16    fn cloned(&self) -> Box<dyn PeerSender>;
17}
18
19#[derive(Debug, thiserror::Error)]
20pub enum SendError {
21    #[error("Connection closed")]
22    ConnectionClosed,
23    #[error("Send timeout")]
24    Timeout,
25    #[error("Other error: {0}")]
26    Other(#[from] anyhow::Error),
27    #[error("Unknown error")]
28    Unknown,
29}
30
31#[async_trait]
32pub trait NodeComms: Send + Sync {
33    fn id(&self) -> proto::NodeId;
34    fn durable(&self) -> bool;
35    fn register_peer(&self, presence: proto::Presence, sender: Box<dyn PeerSender>);
36    fn deregister_peer(&self, node_id: proto::NodeId);
37    async fn handle_message(&self, message: proto::NodeMessage) -> anyhow::Result<()>;
38    fn cloned(&self) -> Box<dyn NodeComms>;
39}
40
41#[async_trait]
42impl<SE: StorageEngine + Send + Sync + 'static, PA: PolicyAgent + Send + Sync + 'static> NodeComms for Node<SE, PA> {
43    fn id(&self) -> proto::NodeId { self.id.clone() }
44    fn durable(&self) -> bool { self.durable }
45    fn register_peer(&self, presence: proto::Presence, sender: Box<dyn PeerSender>) {
46        //
47        NodeInner::register_peer(&self, presence, sender);
48    }
49    fn deregister_peer(&self, node_id: proto::NodeId) {
50        //
51        NodeInner::deregister_peer(&self, node_id);
52    }
53    async fn handle_message(&self, message: proto::NodeMessage) -> anyhow::Result<()> {
54        //
55        NodeInner::handle_message(&self, message).await
56    }
57    fn cloned(&self) -> Box<dyn NodeComms> { Box::new(self.clone()) }
58}