Skip to main content

onebot_api/communication/
sse.rs

1use super::utils::*;
2use crate::error::{ServiceStartError, ServiceStartResult};
3use async_trait::async_trait;
4use bytes::Bytes;
5use eventsource_stream::{EventStream, Eventsource};
6use futures::{Stream, StreamExt};
7use reqwest::IntoUrl;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10use tokio::select;
11use tokio::sync::broadcast;
12use url::Url;
13
14#[derive(Debug, Clone)]
15pub struct SseService {
16	url: Url,
17	access_token: Option<String>,
18	event_sender: Option<InternalEventSender>,
19	close_signal_sender: broadcast::Sender<()>,
20	is_running: Arc<AtomicBool>,
21	// auto_reconnect: bool,
22	// reconnect_interval: Duration,
23	// reconnect_signal_sender: broadcast::Sender<()>
24}
25
26impl Drop for SseService {
27	fn drop(&mut self) {
28		self.uninstall();
29	}
30}
31
32impl SseService {
33	pub fn new(
34		url: impl IntoUrl,
35		access_token: Option<String>,
36		// auto_reconnect: Option<bool>,
37		// reconnect_interval: Option<Duration>,
38	) -> reqwest::Result<Self> {
39		let (close_signal_sender, _) = broadcast::channel(1);
40		// let (reconnect_signal_sender, _) = broadcast::channel(1);
41		Ok(Self {
42			url: url.into_url()?,
43			access_token,
44			event_sender: None,
45			close_signal_sender,
46			is_running: Arc::new(AtomicBool::new(false)),
47			// auto_reconnect: auto_reconnect.unwrap_or(true),
48			// reconnect_interval: reconnect_interval.unwrap_or(Duration::from_secs(10)),
49			// reconnect_signal_sender
50		})
51	}
52
53	pub async fn eventsource(
54		&self,
55	) -> anyhow::Result<EventStream<impl Stream<Item = reqwest::Result<Bytes>>>> {
56		let client = reqwest::Client::new();
57		let mut req = client.get(self.url.clone());
58		if let Some(token) = &self.access_token {
59			req = req.header("Authorization", "Bearer ".to_string() + token);
60		}
61		let eventsource = req.send().await?.bytes_stream().eventsource();
62		Ok(eventsource)
63	}
64
65	async fn eventsource_listener(self) -> anyhow::Result<()> {
66		let mut close_signal = self.close_signal_sender.subscribe();
67		let mut es = self.eventsource().await?;
68		let event_sender = self.event_sender.clone().unwrap();
69		loop {
70			select! {
71				_ = close_signal.recv() => return Err(anyhow::anyhow!("close")),
72				event_option = es.next() => {
73					if let Some(Ok(es_event)) = event_option {
74						let event = serde_json::from_str(&es_event.data);
75						if event.is_err() {
76							continue
77						}
78						let _ = event_sender.send_async(event?).await;
79					}
80				}
81			}
82		}
83	}
84
85	// async fn reconnect_processor(self) -> anyhow::Result<()> {
86	// 	let mut close_signal = self.close_signal_sender.subscribe();
87	// 	let mut reconnect_signal = self.reconnect_signal_sender.subscribe();
88	//
89	// 	loop {
90	// 		select! {
91	// 			_ = close_signal.recv() => return Err(anyhow::anyhow!("close")),
92	// 			_ = reconnect_signal.recv() => {
93	//
94	// 			}
95	// 		}
96	// 	}
97	// }
98}
99
100#[async_trait]
101impl CommunicationService for SseService {
102	fn install(&mut self, _api_receiver: InternalAPIReceiver, event_sender: InternalEventSender) {
103		self.event_sender = Some(event_sender);
104	}
105
106	fn uninstall(&mut self) {
107		self.stop();
108		self.event_sender = None;
109	}
110
111	fn stop(&self) {
112		let _ = self.close_signal_sender.send(());
113		self.is_running.store(false, Ordering::Relaxed);
114	}
115
116	async fn start(&self) -> ServiceStartResult<()> {
117		if self.is_running.load(Ordering::Relaxed) {
118			return Err(ServiceStartError::TaskIsRunning);
119		}
120
121		if self.event_sender.is_none() {
122			return Err(ServiceStartError::NotInjectedEventSender);
123		}
124
125		self.is_running.store(true, Ordering::Relaxed);
126		tokio::spawn(Self::eventsource_listener(self.clone()));
127
128		Ok(())
129	}
130}