onebot_api/communication/
combiner.rs1use crate::communication::utils::*;
2use crate::error::{ServiceStartError, ServiceStartResult};
3use async_trait::async_trait;
4use tokio::select;
5use tokio::sync::broadcast;
6
7pub struct SplitCombiner<S: CommunicationService, R: CommunicationService> {
27 send_side: S,
28 read_side: R,
29 event_process_sender: EventSender,
30 event_sender: Option<EventSender>,
31 close_signal_sender: broadcast::Sender<()>,
32}
33
34impl<S: CommunicationService, R: CommunicationService> Drop for SplitCombiner<S, R> {
35 fn drop(&mut self) {
36 let _ = self.close_signal_sender.send(());
37 }
38}
39
40impl<S: CommunicationService, R: CommunicationService> SplitCombiner<S, R> {
41 pub fn new(send_side: S, read_side: R) -> Self {
42 let (event_process_sender, _) = broadcast::channel(16);
43 let (close_signal_sender, _) = broadcast::channel(1);
44 Self {
45 send_side,
46 read_side,
47 event_process_sender,
48 event_sender: None,
49 close_signal_sender,
50 }
51 }
52}
53
54#[async_trait]
55impl<S: CommunicationService, R: CommunicationService> CommunicationService
56 for SplitCombiner<S, R>
57{
58 fn inject(&mut self, api_receiver: APIReceiver, event_sender: EventSender) {
59 let (_, empty_api_receiver) = flume::bounded(1);
60 self
61 .send_side
62 .inject(api_receiver, self.event_process_sender.clone());
63 self
64 .read_side
65 .inject(empty_api_receiver, event_sender.clone());
66 self.event_sender = Some(event_sender);
67 }
68
69 async fn start_service(&self) -> ServiceStartResult<()> {
70 async fn processor(
71 mut close_signal: broadcast::Receiver<()>,
72 mut event_process_receiver: EventReceiver,
73 event_sender: EventSender,
74 ) -> anyhow::Result<()> {
75 loop {
76 select! {
77 _ = close_signal.recv() => return Err(anyhow::anyhow!("close")),
78 Ok(data) = event_process_receiver.recv() => {
79 if data.is_api_response() {
80 let _ = event_sender.send(data);
81 }
82 }
83 }
84 }
85 }
86
87 if self.event_sender.is_none() {
88 return Err(ServiceStartError::NotInjectedEventSender);
89 }
90 let event_sender = self.event_sender.clone().unwrap();
91
92 tokio::spawn(processor(
93 self.close_signal_sender.subscribe(),
94 self.event_process_sender.subscribe(),
95 event_sender,
96 ));
97
98 futures::try_join!(
99 self.send_side.start_service(),
100 self.read_side.start_service()
101 )?;
102 Ok(())
103 }
104}
105
106pub struct BothEventCombiner<S: CommunicationService, R: CommunicationService> {
111 send_side: S,
112 read_side: R,
113}
114
115impl<S: CommunicationService, R: CommunicationService> BothEventCombiner<S, R> {
116 pub fn new(send_side: S, read_side: R) -> Self {
117 Self {
118 send_side,
119 read_side,
120 }
121 }
122}
123
124impl<S: CommunicationService, R: CommunicationService> Drop for BothEventCombiner<S, R> {
125 fn drop(&mut self) {}
126}
127
128#[async_trait]
129impl<S: CommunicationService, R: CommunicationService> CommunicationService
130 for BothEventCombiner<S, R>
131{
132 fn inject(&mut self, api_receiver: APIReceiver, event_sender: EventSender) {
133 let (_, empty_api_receiver) = flume::bounded(1);
134 self.send_side.inject(api_receiver, event_sender.clone());
135 self.read_side.inject(empty_api_receiver, event_sender);
136 }
137
138 async fn start_service(&self) -> ServiceStartResult<()> {
139 futures::try_join!(
140 self.send_side.start_service(),
141 self.read_side.start_service()
142 )?;
143 Ok(())
144 }
145}