Skip to main content

onebot_api/communication/
combiner.rs

1use crate::communication::utils::*;
2use crate::error::{ServiceStartError, ServiceStartResult};
3use async_trait::async_trait;
4use tokio::select;
5use tokio::sync::broadcast;
6
7/// 将事件接收与API发送分为两个不同服务实现  
8/// 服务分为 `send_side` 与 `read_side`  
9/// 其中,`send_side` 负责API发送服务,`read_side` 负责事件接收服务  
10/// `send_side` 的事件通道由一个 processor task 负责  
11/// processor 将 `send_side` 的API响应事件并入原事件通道,其余事件丢弃
12/// # Examples
13/// ```rust
14/// use std::time::Duration;
15/// use onebot_api::communication::http::HttpService;
16/// use onebot_api::communication::sse::SseService;
17/// use onebot_api::communication::combiner::SplitCombiner;
18/// use onebot_api::communication::utils::Client;
19///
20/// let http_service = HttpService::new("https://example.com", Some("example_token".to_string())).unwrap();
21/// let sse_service = SseService::new("https://example.com/_events", Some("example_token".to_string())).unwrap();
22/// let combiner = SplitCombiner::new(http_service, sse_service);
23/// let client = Client::new(combiner, Duration::from_secs(5), None, None);
24/// client.start_service().await.unwrap();
25/// ```
26pub struct SplitCombiner<S: CommunicationService, R: CommunicationService> {
27	send_side: S,
28	read_side: R,
29	event_process_sender: EventSender,
30	event_sender: Option<EventSender>,
31	close_signal_sender: broadcast::Sender<()>,
32}
33
34impl<S: CommunicationService, R: CommunicationService> Drop for SplitCombiner<S, R> {
35	fn drop(&mut self) {
36		let _ = self.close_signal_sender.send(());
37	}
38}
39
40impl<S: CommunicationService, R: CommunicationService> SplitCombiner<S, R> {
41	pub fn new(send_side: S, read_side: R) -> Self {
42		let (event_process_sender, _) = broadcast::channel(16);
43		let (close_signal_sender, _) = broadcast::channel(1);
44		Self {
45			send_side,
46			read_side,
47			event_process_sender,
48			event_sender: None,
49			close_signal_sender,
50		}
51	}
52}
53
54#[async_trait]
55impl<S: CommunicationService, R: CommunicationService> CommunicationService
56	for SplitCombiner<S, R>
57{
58	fn inject(&mut self, api_receiver: APIReceiver, event_sender: EventSender) {
59		let (_, empty_api_receiver) = flume::bounded(1);
60		self
61			.send_side
62			.inject(api_receiver, self.event_process_sender.clone());
63		self
64			.read_side
65			.inject(empty_api_receiver, event_sender.clone());
66		self.event_sender = Some(event_sender);
67	}
68
69	async fn start_service(&self) -> ServiceStartResult<()> {
70		async fn processor(
71			mut close_signal: broadcast::Receiver<()>,
72			mut event_process_receiver: EventReceiver,
73			event_sender: EventSender,
74		) -> anyhow::Result<()> {
75			loop {
76				select! {
77					_ = close_signal.recv() => return Err(anyhow::anyhow!("close")),
78					Ok(data) = event_process_receiver.recv() => {
79						if data.is_api_response() {
80							let _ = event_sender.send(data);
81						}
82					}
83				}
84			}
85		}
86
87		if self.event_sender.is_none() {
88			return Err(ServiceStartError::NotInjectedEventSender);
89		}
90		let event_sender = self.event_sender.clone().unwrap();
91
92		tokio::spawn(processor(
93			self.close_signal_sender.subscribe(),
94			self.event_process_sender.subscribe(),
95			event_sender,
96		));
97
98		futures::try_join!(
99			self.send_side.start_service(),
100			self.read_side.start_service()
101		)?;
102		Ok(())
103	}
104}
105
106/// 详见 [`SplitCombiner`]  
107/// 与 `SplitCombiner` 的区别在于  
108/// `BothEventCombiner` 会将 `send_side` 的所有事件均并入原事件通道  
109/// 因此,`BothEventCombiner` 不存在 processor task
110pub struct BothEventCombiner<S: CommunicationService, R: CommunicationService> {
111	send_side: S,
112	read_side: R,
113}
114
115impl<S: CommunicationService, R: CommunicationService> BothEventCombiner<S, R> {
116	pub fn new(send_side: S, read_side: R) -> Self {
117		Self {
118			send_side,
119			read_side,
120		}
121	}
122}
123
124impl<S: CommunicationService, R: CommunicationService> Drop for BothEventCombiner<S, R> {
125	fn drop(&mut self) {}
126}
127
128#[async_trait]
129impl<S: CommunicationService, R: CommunicationService> CommunicationService
130	for BothEventCombiner<S, R>
131{
132	fn inject(&mut self, api_receiver: APIReceiver, event_sender: EventSender) {
133		let (_, empty_api_receiver) = flume::bounded(1);
134		self.send_side.inject(api_receiver, event_sender.clone());
135		self.read_side.inject(empty_api_receiver, event_sender);
136	}
137
138	async fn start_service(&self) -> ServiceStartResult<()> {
139		futures::try_join!(
140			self.send_side.start_service(),
141			self.read_side.start_service()
142		)?;
143		Ok(())
144	}
145}