Skip to main content

onebot_api/communication/
sse.rs

1use super::utils::*;
2use crate::error::{ServiceStartError, ServiceStartResult};
3use async_trait::async_trait;
4use bytes::Bytes;
5use eventsource_stream::{EventStream, Eventsource};
6use futures::{Stream, StreamExt};
7use reqwest::IntoUrl;
8use std::sync::Arc;
9use tokio::select;
10use tokio::sync::broadcast;
11use url::Url;
12
13#[derive(Debug, Clone)]
14pub struct SseService {
15	url: Url,
16	access_token: Option<String>,
17	event_sender: Option<EventSender>,
18	close_signal_sender: broadcast::Sender<()>,
19}
20
21impl Drop for SseService {
22	fn drop(&mut self) {
23		let _ = self.close_signal_sender.send(());
24	}
25}
26
27impl SseService {
28	pub fn new(url: impl IntoUrl, access_token: Option<String>) -> reqwest::Result<Self> {
29		let (close_signal_sender, _) = broadcast::channel(1);
30		Ok(Self {
31			url: url.into_url()?,
32			access_token,
33			event_sender: None,
34			close_signal_sender,
35		})
36	}
37
38	pub async fn eventsource(
39		&self,
40	) -> anyhow::Result<EventStream<impl Stream<Item = reqwest::Result<Bytes>>>> {
41		let client = reqwest::Client::new();
42		let mut req = client.get(self.url.clone());
43		if let Some(token) = &self.access_token {
44			req = req.header("Authorization", "Bearer ".to_string() + token);
45		}
46		let eventsource = req.send().await?.bytes_stream().eventsource();
47		Ok(eventsource)
48	}
49
50	async fn eventsource_listener(self) -> anyhow::Result<()> {
51		let mut close_signal = self.close_signal_sender.subscribe();
52		let mut es = self.eventsource().await?;
53		let event_sender = self.event_sender.clone().unwrap();
54		loop {
55			select! {
56				_ = close_signal.recv() => return Err(anyhow::anyhow!("close")),
57				Some(Ok(es_event)) = es.next() => {
58					let event = serde_json::from_str(&es_event.data);
59					if event.is_err() {
60						continue
61					}
62					let _ = event_sender.send(Arc::new(event?));
63				}
64			}
65		}
66	}
67}
68
69#[async_trait]
70impl CommunicationService for SseService {
71	fn inject(&mut self, _api_receiver: APIReceiver, event_sender: EventSender) {
72		self.event_sender = Some(event_sender);
73	}
74
75	async fn start_service(&self) -> ServiceStartResult<()> {
76		if self.event_sender.is_none() {
77			return Err(ServiceStartError::NotInjectedEventSender);
78		}
79
80		tokio::spawn(Self::eventsource_listener(self.clone()));
81
82		Ok(())
83	}
84}