onebot_api/communication/
combiner.rs1use crate::communication::utils::*;
2use async_trait::async_trait;
3use tokio::select;
4use tokio::sync::broadcast;
5
6pub struct SplitCombiner<S: CommunicationService, R: CommunicationService> {
26 send_side: S,
27 read_side: R,
28 event_process_sender: EventSender,
29 event_sender: Option<EventSender>,
30 close_signal_sender: broadcast::Sender<()>,
31}
32
33impl<S: CommunicationService, R: CommunicationService> Drop for SplitCombiner<S, R> {
34 fn drop(&mut self) {
35 let _ = self.close_signal_sender.send(());
36 }
37}
38
39impl<S: CommunicationService, R: CommunicationService> SplitCombiner<S, R> {
40 pub fn new(send_side: S, read_side: R) -> Self {
41 let (event_process_sender, _) = broadcast::channel(16);
42 let (close_signal_sender, _) = broadcast::channel(1);
43 Self {
44 send_side,
45 read_side,
46 event_process_sender,
47 event_sender: None,
48 close_signal_sender,
49 }
50 }
51}
52
53#[async_trait]
54impl<S: CommunicationService, R: CommunicationService> CommunicationService
55 for SplitCombiner<S, R>
56{
57 fn inject(&mut self, api_receiver: APIReceiver, event_sender: EventSender) {
58 let (_, empty_api_receiver) = flume::bounded(1);
59 self
60 .send_side
61 .inject(api_receiver, self.event_process_sender.clone());
62 self
63 .read_side
64 .inject(empty_api_receiver, event_sender.clone());
65 self.event_sender = Some(event_sender);
66 }
67
68 async fn start_service(&self) -> anyhow::Result<()> {
69 async fn processor(
70 mut close_signal: broadcast::Receiver<()>,
71 mut event_process_receiver: EventReceiver,
72 event_sender: EventSender,
73 ) -> anyhow::Result<()> {
74 loop {
75 select! {
76 _ = close_signal.recv() => return Err(anyhow::anyhow!("close")),
77 Ok(data) = event_process_receiver.recv() => {
78 if data.is_api_response() {
79 let _ = event_sender.send(data);
80 }
81 }
82 }
83 }
84 }
85
86 if self.event_sender.is_none() {
87 return Err(anyhow::anyhow!("event sender is none"));
88 }
89 let event_sender = self.event_sender.clone().unwrap();
90
91 tokio::spawn(processor(
92 self.close_signal_sender.subscribe(),
93 self.event_process_sender.subscribe(),
94 event_sender,
95 ));
96
97 futures::try_join!(
98 self.send_side.start_service(),
99 self.read_side.start_service()
100 )?;
101 Ok(())
102 }
103}
104
105pub struct BothEventCombiner<S: CommunicationService, R: CommunicationService> {
110 send_side: S,
111 read_side: R,
112}
113
114impl<S: CommunicationService, R: CommunicationService> BothEventCombiner<S, R> {
115 pub fn new(send_side: S, read_side: R) -> Self {
116 Self {
117 send_side,
118 read_side,
119 }
120 }
121}
122
123#[async_trait]
124impl<S: CommunicationService, R: CommunicationService> CommunicationService
125 for BothEventCombiner<S, R>
126{
127 fn inject(&mut self, api_receiver: APIReceiver, event_sender: EventSender) {
128 let (_, empty_api_receiver) = flume::bounded(1);
129 self.send_side.inject(api_receiver, event_sender.clone());
130 self.read_side.inject(empty_api_receiver, event_sender);
131 }
132
133 async fn start_service(&self) -> anyhow::Result<()> {
134 futures::try_join!(
135 self.send_side.start_service(),
136 self.read_side.start_service()
137 )?;
138 Ok(())
139 }
140}