ankurah_core/
connector.rs

1use ankurah_proto::{self as proto, Attested, EntityState};
2use async_trait::async_trait;
3
4use crate::{policy::PolicyAgent, storage::StorageEngine, Node};
5
6// TODO redesign this such that:
7// - the sender and receiver are disconnected at the same time
8// - a connection id or dyn Ord/Eq/Hash is used to identify the connection for deregistration
9//   so that we can have multiple connections to the same node without things getting mixed up
10
11#[async_trait]
12pub trait PeerSender: Send + Sync {
13    fn send_message(&self, message: proto::NodeMessage) -> Result<(), SendError>;
14    /// The node ID of the recipient of this message
15    fn recipient_node_id(&self) -> proto::EntityId;
16    fn cloned(&self) -> Box<dyn PeerSender>;
17}
18
19#[derive(Debug, thiserror::Error)]
20pub enum SendError {
21    #[error("Connection closed")]
22    ConnectionClosed,
23    #[error("Send timeout")]
24    Timeout,
25    #[error("Other error: {0}")]
26    Other(#[from] anyhow::Error),
27    #[error("Unknown error")]
28    Unknown,
29}
30
31#[async_trait]
32pub trait NodeComms: Send + Sync {
33    fn id(&self) -> proto::EntityId;
34    fn durable(&self) -> bool;
35    fn system_root(&self) -> Option<Attested<EntityState>>;
36    fn register_peer(&self, presence: proto::Presence, sender: Box<dyn PeerSender>);
37    fn deregister_peer(&self, node_id: proto::EntityId);
38    async fn handle_message(&self, message: proto::NodeMessage) -> anyhow::Result<()>;
39    fn cloned(&self) -> Box<dyn NodeComms>;
40}
41
42#[async_trait]
43impl<SE: StorageEngine + Send + Sync + 'static, PA: PolicyAgent + Send + Sync + 'static> NodeComms for Node<SE, PA> {
44    fn id(&self) -> proto::EntityId { self.id }
45    fn durable(&self) -> bool { self.durable }
46    fn system_root(&self) -> Option<Attested<EntityState>> { self.system.root() }
47    fn register_peer(&self, presence: proto::Presence, sender: Box<dyn PeerSender>) {
48        //
49        self.register_peer(presence, sender);
50    }
51    fn deregister_peer(&self, node_id: proto::EntityId) {
52        //
53        self.deregister_peer(node_id);
54    }
55    async fn handle_message(&self, message: proto::NodeMessage) -> anyhow::Result<()> {
56        //
57        self.handle_message(message).await
58    }
59    fn cloned(&self) -> Box<dyn NodeComms> { Box::new(self.clone()) }
60}