onebot_api/communication/
combiner.rs1use crate::communication::utils::*;
2use crate::error::{ServiceStartError, ServiceStartResult};
3use async_trait::async_trait;
4use tokio::select;
5use tokio::sync::broadcast;
6
7pub struct SplitCombiner<S: CommunicationService, R: CommunicationService> {
27 send_side: S,
28 read_side: R,
29 event_process_sender: InternalEventSender,
30 event_process_receiver: InternalEventReceiver,
31 event_sender: Option<InternalEventSender>,
32 close_signal_sender: broadcast::Sender<()>,
33}
34
35impl<S: CommunicationService, R: CommunicationService> Drop for SplitCombiner<S, R> {
36 fn drop(&mut self) {
37 self.uninstall();
38 }
39}
40
41impl<S: CommunicationService, R: CommunicationService> SplitCombiner<S, R> {
42 pub fn new(send_side: S, read_side: R) -> Self {
43 let (event_process_sender, event_process_receiver) = flume::bounded(16);
44 let (close_signal_sender, _) = broadcast::channel(1);
45 Self {
46 send_side,
47 read_side,
48 event_process_sender,
49 event_process_receiver,
50 event_sender: None,
51 close_signal_sender,
52 }
53 }
54}
55
56#[async_trait]
57impl<S: CommunicationService, R: CommunicationService> CommunicationService
58 for SplitCombiner<S, R>
59{
60 fn install(&mut self, api_receiver: InternalAPIReceiver, event_sender: InternalEventSender) {
61 let (_, empty_api_receiver) = flume::bounded(1);
62 self
63 .send_side
64 .install(api_receiver, self.event_process_sender.clone());
65 self
66 .read_side
67 .install(empty_api_receiver, event_sender.clone());
68 self.event_sender = Some(event_sender);
69 }
70
71 fn uninstall(&mut self) {
72 self.stop();
73 self.read_side.uninstall();
74 self.send_side.uninstall();
75 self.event_sender = None;
76 }
77
78 fn stop(&self) {
79 let _ = self.close_signal_sender.send(());
80 self.send_side.stop();
81 self.read_side.stop();
82 }
83
84 async fn start(&self) -> ServiceStartResult<()> {
85 async fn processor(
86 mut close_signal: broadcast::Receiver<()>,
87 event_process_receiver: InternalEventReceiver,
88 event_sender: InternalEventSender,
89 ) -> anyhow::Result<()> {
90 loop {
91 select! {
92 _ = close_signal.recv() => return Err(anyhow::anyhow!("close")),
93 Ok(data) = event_process_receiver.recv_async() => {
94 if data.is_api_response() {
95 let _ = event_sender.send(data);
96 }
97 }
98 }
99 }
100 }
101
102 if self.event_sender.is_none() {
103 return Err(ServiceStartError::NotInjectedEventSender);
104 }
105 let event_sender = self.event_sender.clone().unwrap();
106
107 tokio::spawn(processor(
108 self.close_signal_sender.subscribe(),
109 self.event_process_receiver.clone(),
110 event_sender,
111 ));
112
113 futures::try_join!(self.send_side.start(), self.read_side.start())?;
114 Ok(())
115 }
116
117 async fn restart(&self) -> ServiceStartResult<()> {
118 futures::try_join!(self.send_side.restart(), self.read_side.restart())?;
119 Ok(())
120 }
121}
122
123pub struct BothEventCombiner<S: CommunicationService, R: CommunicationService> {
128 send_side: S,
129 read_side: R,
130}
131
132impl<S: CommunicationService, R: CommunicationService> BothEventCombiner<S, R> {
133 pub fn new(send_side: S, read_side: R) -> Self {
134 Self {
135 send_side,
136 read_side,
137 }
138 }
139}
140
141impl<S: CommunicationService, R: CommunicationService> Drop for BothEventCombiner<S, R> {
142 fn drop(&mut self) {}
143}
144
145#[async_trait]
146impl<S: CommunicationService, R: CommunicationService> CommunicationService
147 for BothEventCombiner<S, R>
148{
149 fn install(&mut self, api_receiver: InternalAPIReceiver, event_sender: InternalEventSender) {
150 let (_, empty_api_receiver) = flume::bounded(1);
151 self.send_side.install(api_receiver, event_sender.clone());
152 self.read_side.install(empty_api_receiver, event_sender);
153 }
154
155 fn uninstall(&mut self) {
156 self.send_side.uninstall();
157 self.read_side.uninstall();
158 }
159
160 fn stop(&self) {
161 self.send_side.stop();
162 self.read_side.stop();
163 }
164
165 async fn start(&self) -> ServiceStartResult<()> {
166 futures::try_join!(self.send_side.start(), self.read_side.start())?;
167 Ok(())
168 }
169
170 async fn restart(&self) -> ServiceStartResult<()> {
171 futures::try_join!(self.send_side.restart(), self.read_side.restart())?;
172 Ok(())
173 }
174}