Skip to main content

onebot_api/communication/
combiner.rs

1use crate::communication::utils::*;
2use async_trait::async_trait;
3use tokio::select;
4use tokio::sync::broadcast;
5
6/// 将事件接收与API发送分为两个不同服务实现  
7/// 服务分为 `send_side` 与 `read_side`  
8/// 其中,`send_side` 负责API发送服务,`read_side` 负责事件接收服务  
9/// `send_side` 的事件通道由一个 processor task 负责  
10/// processor 将 `send_side` 的API响应事件并入原事件通道,其余事件丢弃
11/// # Examples
12/// ```rust
13/// use std::time::Duration;
14/// use onebot_api::communication::http::HttpService;
15/// use onebot_api::communication::sse::SseService;
16/// use onebot_api::communication::combiner::SplitCombiner;
17/// use onebot_api::communication::utils::Client;
18///
19/// let http_service = HttpService::new("https://example.com", Some("example_token".to_string())).unwrap();
20/// let sse_service = SseService::new("https://example.com/_events", Some("example_token".to_string())).unwrap();
21/// let combiner = SplitCombiner::new(http_service, sse_service);
22/// let client = Client::new(combiner, Duration::from_secs(5), None, None);
23/// client.start_service().await.unwrap();
24/// ```
25pub struct SplitCombiner<S: CommunicationService, R: CommunicationService> {
26	send_side: S,
27	read_side: R,
28	event_process_sender: EventSender,
29	event_sender: Option<EventSender>,
30	close_signal_sender: broadcast::Sender<()>,
31}
32
33impl<S: CommunicationService, R: CommunicationService> Drop for SplitCombiner<S, R> {
34	fn drop(&mut self) {
35		let _ = self.close_signal_sender.send(());
36	}
37}
38
39impl<S: CommunicationService, R: CommunicationService> SplitCombiner<S, R> {
40	pub fn new(send_side: S, read_side: R) -> Self {
41		let (event_process_sender, _) = broadcast::channel(16);
42		let (close_signal_sender, _) = broadcast::channel(1);
43		Self {
44			send_side,
45			read_side,
46			event_process_sender,
47			event_sender: None,
48			close_signal_sender,
49		}
50	}
51}
52
53#[async_trait]
54impl<S: CommunicationService, R: CommunicationService> CommunicationService
55	for SplitCombiner<S, R>
56{
57	fn inject(&mut self, api_receiver: APIReceiver, event_sender: EventSender) {
58		let (_, empty_api_receiver) = flume::bounded(1);
59		self
60			.send_side
61			.inject(api_receiver, self.event_process_sender.clone());
62		self
63			.read_side
64			.inject(empty_api_receiver, event_sender.clone());
65		self.event_sender = Some(event_sender);
66	}
67
68	async fn start_service(&self) -> anyhow::Result<()> {
69		async fn processor(
70			mut close_signal: broadcast::Receiver<()>,
71			mut event_process_receiver: EventReceiver,
72			event_sender: EventSender,
73		) -> anyhow::Result<()> {
74			loop {
75				select! {
76					_ = close_signal.recv() => return Err(anyhow::anyhow!("close")),
77					Ok(data) = event_process_receiver.recv() => {
78						if data.is_api_response() {
79							let _ = event_sender.send(data);
80						}
81					}
82				}
83			}
84		}
85
86		if self.event_sender.is_none() {
87			return Err(anyhow::anyhow!("event sender is none"));
88		}
89		let event_sender = self.event_sender.clone().unwrap();
90
91		tokio::spawn(processor(
92			self.close_signal_sender.subscribe(),
93			self.event_process_sender.subscribe(),
94			event_sender,
95		));
96
97		futures::try_join!(
98			self.send_side.start_service(),
99			self.read_side.start_service()
100		)?;
101		Ok(())
102	}
103}
104
105/// 详见 [`SplitCombiner`]  
106/// 与 `SplitCombiner` 的区别在于  
107/// `BothEventCombiner` 会将 `send_side` 的所有事件均并入原事件通道  
108/// 因此,`BothEventCombiner` 不存在 processor task
109pub struct BothEventCombiner<S: CommunicationService, R: CommunicationService> {
110	send_side: S,
111	read_side: R,
112}
113
114impl<S: CommunicationService, R: CommunicationService> BothEventCombiner<S, R> {
115	pub fn new(send_side: S, read_side: R) -> Self {
116		Self {
117			send_side,
118			read_side,
119		}
120	}
121}
122
123#[async_trait]
124impl<S: CommunicationService, R: CommunicationService> CommunicationService
125	for BothEventCombiner<S, R>
126{
127	fn inject(&mut self, api_receiver: APIReceiver, event_sender: EventSender) {
128		let (_, empty_api_receiver) = flume::bounded(1);
129		self.send_side.inject(api_receiver, event_sender.clone());
130		self.read_side.inject(empty_api_receiver, event_sender);
131	}
132
133	async fn start_service(&self) -> anyhow::Result<()> {
134		futures::try_join!(
135			self.send_side.start_service(),
136			self.read_side.start_service()
137		)?;
138		Ok(())
139	}
140}