1use anyhow::Result;
2use cfg_if::cfg_if;
3use futures::Future;
4use gloo_net::http::Headers;
5use pikav::Event;
6use serde_json::Value;
7
8cfg_if! {
9 if #[cfg(feature = "hydrate")] {
10 use std::{cell::RefCell, pin::Pin, rc::Rc};
11 use std::{
12 collections::HashSet,
13 sync::atomic::{AtomicUsize, Ordering},
14 };
15 use futures::{future::BoxFuture, StreamExt};
16 use gloo_net::{
17 eventsource::futures::EventSource,
18 http::{Request, Response},
19 };
20 use log::error;
21 use wasm_bindgen_futures::spawn_local;
22
23 type HeadersFut = Box<dyn Fn() -> Pin<Box<dyn Future<Output = Result<Headers>>>>>;
24 type ListenerFut = Box<dyn Fn(Event<Value, Value>) -> BoxFuture<'static, ()>>;
25 }
26}
27
28cfg_if! {
29 if #[cfg(feature = "hydrate")] {
30 #[derive(Clone)]
31 pub struct Client {
32 id: Rc<RefCell<Option<String>>>,
33 source_url: String,
34 source: Rc<RefCell<Option<EventSource>>>,
35 endpoint: String,
36 namespace: String,
37 next_listener_id: Rc<AtomicUsize>,
38 get_headers: Rc<RefCell<Option<HeadersFut>>>,
39 listeners: Rc<RefCell<Vec<(usize, String, ListenerFut)>>>,
40 }
41 } else {
42 #[derive(Clone)]
43 pub struct Client {
44 endpoint: String,
45 namespace: String,
46 }
47 }
48}
49
50impl Client {
51 pub fn new(endpoint: impl Into<String>) -> Self {
52 let endpoint = endpoint.into();
53
54 cfg_if! {
55 if #[cfg(feature = "hydrate")] {
56 Self {
57 id: Rc::default(),
58 get_headers: Rc::default(),
59 next_listener_id: Rc::default(),
60 listeners: Rc::default(),
61 source: Rc::default(),
62 source_url: format!("{endpoint}/events"),
63 endpoint,
64 namespace: "_".to_owned(),
65 }
66 } else {
67 Self {
68 endpoint,
69 namespace: "_".to_owned(),
70 }
71 }
72 }
73 }
74
75 pub fn run(self) -> Result<Self> {
76 cfg_if! {
77 if #[cfg(feature = "hydrate")] {
78 let mut source = gloo_net::eventsource::futures::EventSource::new(&self.source_url)?;
79 let mut stream = source.subscribe("message")?;
80 *self.source.borrow_mut() = Some(source);
81 let id = self.id.clone();
82 let listeners = self.listeners.clone();
83 let fetcher = Fetcher::from(&self);
84
85 spawn_local(async move {
86 while let Some(Ok((_, msg))) = stream.next().await {
87 if msg.data().as_string() == Some("ping".to_owned()) {
88 continue;
89 }
90
91 let data = match msg.data().as_string() {
92 Some(data) => data,
93 _ => {
94 error!("invalid type: {:?}", msg.data());
95 continue;
96 }
97 };
98
99 let event = match serde_json::from_str::<Event<Value, Value>>(&data) {
100 Ok(data) => data,
101 Err(e) => {
102 error!("invalid type: {:?}", e);
103 continue;
104 }
105 };
106
107 if matches!(
108 (event.topic.as_ref(), event.name.as_ref()),
109 ("$SYS/session", "Created")
110 ) {
111 *id.borrow_mut() = event.data.as_str().map(|v| v.to_owned());
112
113 let mut subscribed = HashSet::new();
114
115 if let Some(client_id) = event.data.as_str() {
116 let filters = {
117 listeners
118 .borrow()
119 .iter()
120 .map(|(_, f, _)| f.to_owned())
121 .collect::<Vec<_>>()
122 };
123
124 for filter in filters {
125 if subscribed.contains(&filter) {
126 continue;
127 }
128
129 if let Err(e) = fetcher.fetch(client_id, "subscribe", &filter).await {
130 error!("{e}");
131 }
132
133 subscribed.insert(filter);
134 }
135 }
136 }
137
138 let listeners_fut = {
139 let mut listeners_fut = Vec::new();
140 for (_, filter, listener) in listeners.borrow().iter() {
141 let filters = match &event.filters {
142 Some(v) => v,
143 _ => continue,
144 };
145
146 if filters.iter().any(|f| f == filter) {
147 listeners_fut.push(listener(event.clone()));
148 }
149 }
150 listeners_fut
151 };
152
153 futures::future::join_all(listeners_fut).await;
154 }
155 });
156 }
157 }
158
159 Ok(self)
160 }
161
162 pub fn endpoint(mut self, v: impl Into<String>) -> Self {
163 self.endpoint = v.into();
164
165 self
166 }
167
168 pub fn namespace(mut self, v: impl Into<String>) -> Self {
169 self.namespace = v.into();
170
171 self
172 }
173
174 pub fn close(&self) {
175 cfg_if! {
176 if #[cfg(feature = "hydrate")] {
177 if let Some(source) = self.source.borrow().as_ref() {
178 source.clone().close();
179 }
180 }
181 }
182 }
183
184 cfg_if! {
185 if #[cfg(feature = "hydrate")] {
186 pub fn get_headers<Fu>(self, cb: impl Fn() -> Fu + 'static) -> Self
187 where
188 Fu: Future<Output = Result<Headers>> + 'static,
189 {
190 let get_headers = self.get_headers.clone();
191 *get_headers.borrow_mut() = Some(Box::new(move || Box::pin(cb())));
192
193 self
194 }
195 } else {
196 pub fn get_headers<Fu>(self, _cb: impl Fn() -> Fu + 'static) -> Self
197 where
198 Fu: Future<Output = Result<Headers>> + 'static,
199 {
200 self
201 }
202 }
203 }
204 cfg_if! {
205 if #[cfg(feature = "hydrate")] {
206 pub fn subscribe<Fu>(
207 &self,
208 filter: impl Into<String>,
209 listener: impl Fn(Event<Value, Value>) -> Fu + 'static,
210 ) -> impl FnOnce()
211 where
212 Fu: Future<Output = ()> + 'static + Send,
213 {
214 let filter = format!("{}/{}", self.namespace, filter.into());
215 let id = self.next_listener_id.fetch_add(1, Ordering::Relaxed);
216 let listeners = self.listeners.clone();
217
218 listeners
219 .borrow_mut()
220 .push((id, filter.clone(), Box::new(move |e| Box::pin(listener(e)))));
221
222 let total_filters = listeners
223 .borrow()
224 .iter()
225 .filter(|(_, f, _)| f == &filter)
226 .count();
227
228 let fetcher = Fetcher::from(self);
229
230 if let (Some(client_id), 1) = (self.id.borrow().to_owned(), total_filters) {
231 let filter = filter.clone();
232 let fetcher = fetcher.clone();
233
234 spawn_local(async move {
235 if let Err(e) = fetcher.fetch(&client_id, "subscribe", &filter).await {
236 error!("{e}");
237 }
238 });
239 }
240
241 let client_id = self.id.clone();
242
243 move || {
244 listeners.borrow_mut().retain(|l| l.0 != id);
245
246 let total_filters = listeners
247 .borrow()
248 .iter()
249 .filter(|(_, f, _)| f == &filter)
250 .count();
251
252 if total_filters > 0 {
253 return;
254 }
255
256 if let Some(client_id) = client_id.borrow().to_owned() {
257 spawn_local(async move {
258 if let Err(e) = fetcher.fetch(&client_id, "unsubscribe", &filter).await {
259 error!("{e}");
260 }
261 });
262 }
263 }
264 }
265 }
266 else {
267 pub fn subscribe<Fu>(
268 &self,
269 _filter: impl Into<String>,
270 _listener: impl Fn(Event<Value, Value>) -> Fu + 'static,
271 ) -> impl FnOnce()
272 where
273 Fu: Future<Output = ()> + 'static + Send,
274 {
275 move || {}
276 }
277 }
278 }
279}
280
281cfg_if! {
282 if #[cfg(feature = "hydrate")] {
283 #[derive(Clone)]
284 struct Fetcher {
285 endpoint: String,
286 get_headers: Rc<RefCell<Option<HeadersFut>>>,
287 }
288
289 impl Fetcher {
290 pub async fn fetch(
291 &self,
292 client_id: &str,
293 action: impl Into<String>,
294 filter: &str,
295 ) -> Result<Response> {
296 let filter = filter.to_string();
297 let mut req = Request::put(&format!("{}/{}/{}", self.endpoint, action.into(), filter));
298 let get_headers = { self.get_headers.borrow().as_ref().map(|f| f()) };
299
300 if let Some(get_headers) = get_headers {
301 let headers = get_headers.await?;
302 req = req.headers(headers);
303 }
304
305 let res = req
306 .header("Accept", "application/json")
307 .header("Content-Type", "application/json")
308 .header("X-Pikav-Client-ID", client_id)
309 .send()
310 .await?;
311
312 Ok(res)
313 }
314 }
315
316 impl From<&Client> for Fetcher {
317 fn from(value: &Client) -> Self {
318 Self {
319 endpoint: value.endpoint.to_owned(),
320 get_headers: value.get_headers.clone(),
321 }
322 }
323 }
324 }
325}