atm0s_small_p2p/
service.rs

1use tokio::sync::mpsc::{channel, Receiver, Sender};
2
3use crate::{ctx::SharedCtx, msg::P2pServiceId, router::SharedRouterTable, stream::P2pQuicStream, PeerId};
4
5pub mod alias_service;
6pub mod metrics_service;
7pub mod pubsub_service;
8pub mod visualization_service;
9
10const SERVICE_CHANNEL_SIZE: usize = 10;
11
12#[derive(Debug, PartialEq, Eq)]
13pub enum P2pServiceEvent {
14    Unicast(PeerId, Vec<u8>),
15    Broadcast(PeerId, Vec<u8>),
16    Stream(PeerId, Vec<u8>, P2pQuicStream),
17}
18
19#[derive(Debug, Clone)]
20pub struct P2pServiceRequester {
21    service: P2pServiceId,
22    ctx: SharedCtx,
23}
24
25pub struct P2pService {
26    service: P2pServiceId,
27    ctx: SharedCtx,
28    rx: Receiver<P2pServiceEvent>,
29}
30
31impl P2pService {
32    pub(super) fn build(service: P2pServiceId, ctx: SharedCtx) -> (Self, Sender<P2pServiceEvent>) {
33        let (tx, rx) = channel(SERVICE_CHANNEL_SIZE);
34        (Self { service, ctx, rx }, tx)
35    }
36
37    pub fn requester(&self) -> P2pServiceRequester {
38        P2pServiceRequester {
39            service: self.service,
40            ctx: self.ctx.clone(),
41        }
42    }
43
44    pub async fn send_unicast(&self, dest: PeerId, data: Vec<u8>) -> anyhow::Result<()> {
45        self.ctx.send_unicast(self.service, dest, data).await
46    }
47
48    pub async fn send_broadcast(&self, data: Vec<u8>) {
49        self.ctx.send_broadcast(self.service, data).await
50    }
51
52    pub async fn try_send_unicast(&self, dest: PeerId, data: Vec<u8>) -> anyhow::Result<()> {
53        self.ctx.try_send_unicast(self.service, dest, data)
54    }
55
56    pub async fn try_send_broadcast(&self, data: Vec<u8>) {
57        self.ctx.try_send_broadcast(self.service, data)
58    }
59
60    pub async fn open_stream(&self, dest: PeerId, meta: Vec<u8>) -> anyhow::Result<P2pQuicStream> {
61        self.ctx.open_stream(self.service, dest, meta).await
62    }
63
64    pub fn router(&self) -> &SharedRouterTable {
65        self.ctx.router()
66    }
67
68    pub async fn recv(&mut self) -> Option<P2pServiceEvent> {
69        self.rx.recv().await
70    }
71}
72
73impl P2pServiceRequester {
74    pub async fn send_unicast(&self, dest: PeerId, data: Vec<u8>) -> anyhow::Result<()> {
75        self.ctx.send_unicast(self.service, dest, data).await
76    }
77
78    pub async fn send_broadcast(&self, data: Vec<u8>) {
79        self.ctx.send_broadcast(self.service, data).await
80    }
81
82    pub async fn try_send_unicast(&self, dest: PeerId, data: Vec<u8>) -> anyhow::Result<()> {
83        self.ctx.try_send_unicast(self.service, dest, data)
84    }
85
86    pub async fn try_send_broadcast(&self, data: Vec<u8>) {
87        self.ctx.try_send_broadcast(self.service, data)
88    }
89
90    pub async fn open_stream(&self, dest: PeerId, meta: Vec<u8>) -> anyhow::Result<P2pQuicStream> {
91        self.ctx.open_stream(self.service, dest, meta).await
92    }
93
94    pub fn router(&self) -> &SharedRouterTable {
95        self.ctx.router()
96    }
97}