Skip to main content

onebot_api/communication/
combiner.rs

1use crate::communication::utils::*;
2use crate::error::{ServiceStartError, ServiceStartResult};
3use async_trait::async_trait;
4use tokio::select;
5use tokio::sync::broadcast;
6
7/// 将事件接收与API发送分为两个不同服务实现  
8/// 服务分为 `send_side` 与 `read_side`  
9/// 其中,`send_side` 负责API发送服务,`read_side` 负责事件接收服务  
10/// `send_side` 的事件通道由一个 processor task 负责  
11/// processor 将 `send_side` 的API响应事件并入原事件通道,其余事件丢弃
12/// # Examples
13/// ```rust
14/// use std::time::Duration;
15/// use onebot_api::communication::http::HttpService;
16/// use onebot_api::communication::sse::SseService;
17/// use onebot_api::communication::combiner::SplitCombiner;
18/// use onebot_api::communication::utils::Client;
19///
20/// let http_service = HttpService::new("https://example.com", Some("example_token".to_string())).unwrap();
21/// let sse_service = SseService::new("https://example.com/_events", Some("example_token".to_string())).unwrap();
22/// let combiner = SplitCombiner::new(http_service, sse_service);
23/// let client = Client::new_with_options(combiner, Duration::from_secs(5), None, None);
24/// client.start_service().await.unwrap();
25/// ```
26pub struct SplitCombiner<S: CommunicationService, R: CommunicationService> {
27	send_side: S,
28	read_side: R,
29	event_process_sender: InternalEventSender,
30	event_process_receiver: InternalEventReceiver,
31	event_sender: Option<InternalEventSender>,
32	close_signal_sender: broadcast::Sender<()>,
33}
34
35impl<S: CommunicationService, R: CommunicationService> Drop for SplitCombiner<S, R> {
36	fn drop(&mut self) {
37		self.uninstall();
38	}
39}
40
41impl<S: CommunicationService, R: CommunicationService> SplitCombiner<S, R> {
42	pub fn new(send_side: S, read_side: R) -> Self {
43		let (event_process_sender, event_process_receiver) = flume::bounded(16);
44		let (close_signal_sender, _) = broadcast::channel(1);
45		Self {
46			send_side,
47			read_side,
48			event_process_sender,
49			event_process_receiver,
50			event_sender: None,
51			close_signal_sender,
52		}
53	}
54}
55
56#[async_trait]
57impl<S: CommunicationService, R: CommunicationService> CommunicationService
58	for SplitCombiner<S, R>
59{
60	fn install(&mut self, api_receiver: InternalAPIReceiver, event_sender: InternalEventSender) {
61		let (_, empty_api_receiver) = flume::bounded(1);
62		self
63			.send_side
64			.install(api_receiver, self.event_process_sender.clone());
65		self
66			.read_side
67			.install(empty_api_receiver, event_sender.clone());
68		self.event_sender = Some(event_sender);
69	}
70
71	fn uninstall(&mut self) {
72		self.stop();
73		self.read_side.uninstall();
74		self.send_side.uninstall();
75		self.event_sender = None;
76	}
77
78	fn stop(&self) {
79		let _ = self.close_signal_sender.send(());
80		self.send_side.stop();
81		self.read_side.stop();
82	}
83
84	async fn start(&self) -> ServiceStartResult<()> {
85		async fn processor(
86			mut close_signal: broadcast::Receiver<()>,
87			event_process_receiver: InternalEventReceiver,
88			event_sender: InternalEventSender,
89		) -> anyhow::Result<()> {
90			loop {
91				select! {
92					_ = close_signal.recv() => return Err(anyhow::anyhow!("close")),
93					Ok(data) = event_process_receiver.recv_async() => {
94						if data.is_api_response() {
95							let _ = event_sender.send(data);
96						}
97					}
98				}
99			}
100		}
101
102		if self.event_sender.is_none() {
103			return Err(ServiceStartError::NotInjectedEventSender);
104		}
105		let event_sender = self.event_sender.clone().unwrap();
106
107		tokio::spawn(processor(
108			self.close_signal_sender.subscribe(),
109			self.event_process_receiver.clone(),
110			event_sender,
111		));
112
113		futures::try_join!(self.send_side.start(), self.read_side.start())?;
114		Ok(())
115	}
116
117	async fn restart(&self) -> ServiceStartResult<()> {
118		futures::try_join!(self.send_side.restart(), self.read_side.restart())?;
119		Ok(())
120	}
121}
122
123/// 详见 [`SplitCombiner`]  
124/// 与 `SplitCombiner` 的区别在于  
125/// `BothEventCombiner` 会将 `send_side` 的所有事件均并入原事件通道  
126/// 因此,`BothEventCombiner` 不存在 processor task
127pub struct BothEventCombiner<S: CommunicationService, R: CommunicationService> {
128	send_side: S,
129	read_side: R,
130}
131
132impl<S: CommunicationService, R: CommunicationService> BothEventCombiner<S, R> {
133	pub fn new(send_side: S, read_side: R) -> Self {
134		Self {
135			send_side,
136			read_side,
137		}
138	}
139}
140
141impl<S: CommunicationService, R: CommunicationService> Drop for BothEventCombiner<S, R> {
142	fn drop(&mut self) {}
143}
144
145#[async_trait]
146impl<S: CommunicationService, R: CommunicationService> CommunicationService
147	for BothEventCombiner<S, R>
148{
149	fn install(&mut self, api_receiver: InternalAPIReceiver, event_sender: InternalEventSender) {
150		let (_, empty_api_receiver) = flume::bounded(1);
151		self.send_side.install(api_receiver, event_sender.clone());
152		self.read_side.install(empty_api_receiver, event_sender);
153	}
154
155	fn uninstall(&mut self) {
156		self.send_side.uninstall();
157		self.read_side.uninstall();
158	}
159
160	fn stop(&self) {
161		self.send_side.stop();
162		self.read_side.stop();
163	}
164
165	async fn start(&self) -> ServiceStartResult<()> {
166		futures::try_join!(self.send_side.start(), self.read_side.start())?;
167		Ok(())
168	}
169
170	async fn restart(&self) -> ServiceStartResult<()> {
171		futures::try_join!(self.send_side.restart(), self.read_side.restart())?;
172		Ok(())
173	}
174}