onebot_api/communication/
sse.rs1use super::utils::*;
2use crate::error::{ServiceStartError, ServiceStartResult};
3use async_trait::async_trait;
4use bytes::Bytes;
5use eventsource_stream::{EventStream, Eventsource};
6use futures::{Stream, StreamExt};
7use reqwest::IntoUrl;
8use std::sync::Arc;
9use tokio::select;
10use tokio::sync::broadcast;
11use url::Url;
12
13#[derive(Debug, Clone)]
14pub struct SseService {
15 url: Url,
16 access_token: Option<String>,
17 event_sender: Option<EventSender>,
18 close_signal_sender: broadcast::Sender<()>,
19}
20
21impl Drop for SseService {
22 fn drop(&mut self) {
23 let _ = self.close_signal_sender.send(());
24 }
25}
26
27impl SseService {
28 pub fn new(url: impl IntoUrl, access_token: Option<String>) -> reqwest::Result<Self> {
29 let (close_signal_sender, _) = broadcast::channel(1);
30 Ok(Self {
31 url: url.into_url()?,
32 access_token,
33 event_sender: None,
34 close_signal_sender,
35 })
36 }
37
38 pub async fn eventsource(
39 &self,
40 ) -> anyhow::Result<EventStream<impl Stream<Item = reqwest::Result<Bytes>>>> {
41 let client = reqwest::Client::new();
42 let mut req = client.get(self.url.clone());
43 if let Some(token) = &self.access_token {
44 req = req.header("Authorization", "Bearer ".to_string() + token);
45 }
46 let eventsource = req.send().await?.bytes_stream().eventsource();
47 Ok(eventsource)
48 }
49
50 async fn eventsource_listener(self) -> anyhow::Result<()> {
51 let mut close_signal = self.close_signal_sender.subscribe();
52 let mut es = self.eventsource().await?;
53 let event_sender = self.event_sender.clone().unwrap();
54 loop {
55 select! {
56 _ = close_signal.recv() => return Err(anyhow::anyhow!("close")),
57 Some(Ok(es_event)) = es.next() => {
58 let event = serde_json::from_str(&es_event.data);
59 if event.is_err() {
60 continue
61 }
62 let _ = event_sender.send(Arc::new(event?));
63 }
64 }
65 }
66 }
67}
68
69#[async_trait]
70impl CommunicationService for SseService {
71 fn inject(&mut self, _api_receiver: APIReceiver, event_sender: EventSender) {
72 self.event_sender = Some(event_sender);
73 }
74
75 async fn start_service(&self) -> ServiceStartResult<()> {
76 if self.event_sender.is_none() {
77 return Err(ServiceStartError::NotInjectedEventSender);
78 }
79
80 tokio::spawn(Self::eventsource_listener(self.clone()));
81
82 Ok(())
83 }
84}