atm0s_small_p2p/
service.rs1use tokio::sync::mpsc::{channel, Receiver, Sender};
2
3use crate::{ctx::SharedCtx, msg::P2pServiceId, router::SharedRouterTable, stream::P2pQuicStream, PeerId};
4
5pub mod alias_service;
6pub mod metrics_service;
7pub mod pubsub_service;
8pub mod visualization_service;
9
10const SERVICE_CHANNEL_SIZE: usize = 10;
11
12#[derive(Debug, PartialEq, Eq)]
13pub enum P2pServiceEvent {
14 Unicast(PeerId, Vec<u8>),
15 Broadcast(PeerId, Vec<u8>),
16 Stream(PeerId, Vec<u8>, P2pQuicStream),
17}
18
19#[derive(Debug, Clone)]
20pub struct P2pServiceRequester {
21 service: P2pServiceId,
22 ctx: SharedCtx,
23}
24
25pub struct P2pService {
26 service: P2pServiceId,
27 ctx: SharedCtx,
28 rx: Receiver<P2pServiceEvent>,
29}
30
31impl P2pService {
32 pub(super) fn build(service: P2pServiceId, ctx: SharedCtx) -> (Self, Sender<P2pServiceEvent>) {
33 let (tx, rx) = channel(SERVICE_CHANNEL_SIZE);
34 (Self { service, ctx, rx }, tx)
35 }
36
37 pub fn requester(&self) -> P2pServiceRequester {
38 P2pServiceRequester {
39 service: self.service,
40 ctx: self.ctx.clone(),
41 }
42 }
43
44 pub async fn send_unicast(&self, dest: PeerId, data: Vec<u8>) -> anyhow::Result<()> {
45 self.ctx.send_unicast(self.service, dest, data).await
46 }
47
48 pub async fn send_broadcast(&self, data: Vec<u8>) {
49 self.ctx.send_broadcast(self.service, data).await
50 }
51
52 pub async fn try_send_unicast(&self, dest: PeerId, data: Vec<u8>) -> anyhow::Result<()> {
53 self.ctx.try_send_unicast(self.service, dest, data)
54 }
55
56 pub async fn try_send_broadcast(&self, data: Vec<u8>) {
57 self.ctx.try_send_broadcast(self.service, data)
58 }
59
60 pub async fn open_stream(&self, dest: PeerId, meta: Vec<u8>) -> anyhow::Result<P2pQuicStream> {
61 self.ctx.open_stream(self.service, dest, meta).await
62 }
63
64 pub fn router(&self) -> &SharedRouterTable {
65 self.ctx.router()
66 }
67
68 pub async fn recv(&mut self) -> Option<P2pServiceEvent> {
69 self.rx.recv().await
70 }
71}
72
73impl P2pServiceRequester {
74 pub async fn send_unicast(&self, dest: PeerId, data: Vec<u8>) -> anyhow::Result<()> {
75 self.ctx.send_unicast(self.service, dest, data).await
76 }
77
78 pub async fn send_broadcast(&self, data: Vec<u8>) {
79 self.ctx.send_broadcast(self.service, data).await
80 }
81
82 pub async fn try_send_unicast(&self, dest: PeerId, data: Vec<u8>) -> anyhow::Result<()> {
83 self.ctx.try_send_unicast(self.service, dest, data)
84 }
85
86 pub async fn try_send_broadcast(&self, data: Vec<u8>) {
87 self.ctx.try_send_broadcast(self.service, data)
88 }
89
90 pub async fn open_stream(&self, dest: PeerId, meta: Vec<u8>) -> anyhow::Result<P2pQuicStream> {
91 self.ctx.open_stream(self.service, dest, meta).await
92 }
93
94 pub fn router(&self) -> &SharedRouterTable {
95 self.ctx.router()
96 }
97}