1use std::pin::Pin;
6
7use futures_core::Stream;
8use serde_json::Value;
9use tokio::sync::mpsc;
10use tokio::task::JoinHandle;
11
12use crate::client::{decode_or_raise, AudDInner};
13use crate::errors::{AudDError, ErrorKind};
14use crate::helpers::{add_return_to_url, derive_longpoll_category, parse_callback};
15use crate::http::HttpClient;
16use crate::models::{
17 CallbackEvent, Stream as StreamRow, StreamCallbackMatch, StreamCallbackNotification,
18};
19use crate::retry::{retry_async, RetryPolicy};
20
21const NO_CALLBACK_ERROR_CODE: i32 = 19;
24
25const HTTP_CLIENT_ERROR_FLOOR: u16 = 400;
26
27const PREFLIGHT_NO_CALLBACK_HINT: &str =
28 "Longpoll won't deliver events because no callback URL is configured for this account. \
29Set one first via streams.set_callback_url(...) — `https://audd.tech/empty/` is fine if \
30you only want longpolling and don't need a real receiver. \
31To skip this check, pass skip_callback_check=true.";
32
33const CHANNEL_BUFFER: usize = 16;
36
37type BoxStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;
40
41#[derive(Debug, Clone)]
44pub struct LongpollOptions {
45 since_time: Option<i64>,
46 timeout: i64,
47 skip_callback_check: bool,
48}
49
50impl Default for LongpollOptions {
51 fn default() -> Self {
52 Self {
53 since_time: None,
54 timeout: 50,
55 skip_callback_check: false,
56 }
57 }
58}
59
60impl LongpollOptions {
61 #[must_use]
63 pub fn since_time(mut self, t: i64) -> Self {
64 self.since_time = Some(t);
65 self
66 }
67
68 #[must_use]
70 pub fn timeout(mut self, secs: i64) -> Self {
71 self.timeout = secs;
72 self
73 }
74
75 #[must_use]
77 pub fn skip_callback_check(mut self, skip: bool) -> Self {
78 self.skip_callback_check = skip;
79 self
80 }
81}
82
83pub struct LongpollPoll {
93 pub matches: BoxStream<StreamCallbackMatch>,
95 pub notifications: BoxStream<StreamCallbackNotification>,
97 pub errors: BoxStream<AudDError>,
99
100 shutdown: Option<mpsc::Sender<()>>,
101 join: Option<JoinHandle<()>>,
102}
103
104impl std::fmt::Debug for LongpollPoll {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 f.debug_struct("LongpollPoll").finish_non_exhaustive()
107 }
108}
109
110impl LongpollPoll {
111 pub async fn close(mut self) {
114 self.close_internal().await;
115 }
116
117 async fn close_internal(&mut self) {
118 self.shutdown.take();
120 if let Some(handle) = self.join.take() {
121 let _ = handle.await;
122 }
123 }
124}
125
126impl Drop for LongpollPoll {
127 fn drop(&mut self) {
128 self.shutdown.take();
132 if let Some(handle) = self.join.take() {
133 handle.abort();
134 }
135 }
136}
137
138pub struct Streams<'a> {
140 inner: &'a AudDInner,
141}
142
143impl<'a> Streams<'a> {
144 pub(crate) fn new(inner: &'a AudDInner) -> Self {
145 Self { inner }
146 }
147
148 pub async fn set_callback_url(
156 &self,
157 url: &str,
158 return_metadata: Option<&[String]>,
159 ) -> Result<(), AudDError> {
160 let url = add_return_to_url(url, return_metadata)?;
161 post_form(
162 &self.inner.http,
163 &format!("{}/setCallbackUrl/", self.inner.api_base),
164 &[("url", url)],
165 self.inner.mutating_policy(),
166 )
167 .await
168 .map(drop)
169 }
170
171 pub async fn get_callback_url(&self) -> Result<String, AudDError> {
177 let result = post_form(
178 &self.inner.http,
179 &format!("{}/getCallbackUrl/", self.inner.api_base),
180 &[],
181 self.inner.read_policy(),
182 )
183 .await?;
184 Ok(result
185 .as_str()
186 .map_or_else(|| result.to_string(), str::to_string))
187 }
188
189 pub async fn add(
200 &self,
201 url: &str,
202 radio_id: i64,
203 callbacks: Option<&str>,
204 ) -> Result<(), AudDError> {
205 let mut fields: Vec<(&str, String)> =
206 vec![("url", url.to_string()), ("radio_id", radio_id.to_string())];
207 if let Some(cb) = callbacks {
208 fields.push(("callbacks", cb.to_string()));
209 }
210 post_form(
211 &self.inner.http,
212 &format!("{}/addStream/", self.inner.api_base),
213 &fields,
214 self.inner.mutating_policy(),
215 )
216 .await
217 .map(drop)
218 }
219
220 pub async fn set_url(&self, radio_id: i64, url: &str) -> Result<(), AudDError> {
226 post_form(
227 &self.inner.http,
228 &format!("{}/setStreamUrl/", self.inner.api_base),
229 &[("radio_id", radio_id.to_string()), ("url", url.to_string())],
230 self.inner.mutating_policy(),
231 )
232 .await
233 .map(drop)
234 }
235
236 pub async fn delete(&self, radio_id: i64) -> Result<(), AudDError> {
242 post_form(
243 &self.inner.http,
244 &format!("{}/deleteStream/", self.inner.api_base),
245 &[("radio_id", radio_id.to_string())],
246 self.inner.mutating_policy(),
247 )
248 .await
249 .map(drop)
250 }
251
252 pub async fn list(&self) -> Result<Vec<StreamRow>, AudDError> {
258 let result = post_form(
259 &self.inner.http,
260 &format!("{}/getStreams/", self.inner.api_base),
261 &[],
262 self.inner.read_policy(),
263 )
264 .await?;
265 if result.is_null() {
266 return Ok(Vec::new());
267 }
268 let v: Vec<StreamRow> =
269 serde_json::from_value(result.clone()).map_err(|e| AudDError::Serialization {
270 message: format!("could not parse getStreams result: {e}"),
271 raw_text: result.to_string(),
272 })?;
273 Ok(v)
274 }
275
276 #[must_use]
280 pub fn derive_longpoll_category(&self, radio_id: i64) -> String {
281 derive_longpoll_category(&self.inner.api_token(), radio_id)
282 }
283
284 pub fn parse_callback(&self, body: Value) -> Result<CallbackEvent, AudDError> {
291 parse_callback(body)
292 }
293
294 pub async fn longpoll(
312 &self,
313 category: &str,
314 opts: LongpollOptions,
315 ) -> Result<LongpollPoll, AudDError> {
316 if !opts.skip_callback_check {
317 self.preflight_callback().await?;
318 }
319 Ok(spawn_longpoll(LongpollDriver::Authenticated {
320 http: self.inner.http.clone(),
321 url: format!("{}/longpoll/", self.inner.api_base),
322 policy: self.inner.read_policy(),
323 category: category.to_string(),
324 opts,
325 }))
326 }
327
328 async fn preflight_callback(&self) -> Result<(), AudDError> {
329 match self.get_callback_url().await {
330 Ok(_) => Ok(()),
331 Err(e) if e.error_code() == Some(NO_CALLBACK_ERROR_CODE) => {
332 let (http_status, request_id) = match &e {
333 AudDError::Api {
334 http_status,
335 request_id,
336 ..
337 } => (*http_status, request_id.clone()),
338 _ => (0, None),
339 };
340 Err(AudDError::Api {
341 code: 0,
342 message: PREFLIGHT_NO_CALLBACK_HINT.to_string(),
343 kind: ErrorKind::InvalidRequest,
344 http_status,
345 request_id,
346 requested_params: std::collections::HashMap::new(),
347 request_method: None,
348 branded_message: None,
349 raw_response: Value::Null,
350 })
351 }
352 Err(other) => Err(other),
353 }
354 }
355}
356
357pub(crate) enum LongpollDriver {
360 Authenticated {
361 http: HttpClient,
362 url: String,
363 policy: RetryPolicy,
364 category: String,
365 opts: LongpollOptions,
366 },
367 Tokenless {
368 http: crate::http::BareHttpClient,
369 url: String,
370 policy: RetryPolicy,
371 category: String,
372 since_time: Option<i64>,
373 timeout: i64,
374 },
375}
376
377impl LongpollDriver {
378 fn category(&self) -> &str {
379 match self {
380 Self::Authenticated { category, .. } | Self::Tokenless { category, .. } => category,
381 }
382 }
383
384 fn timeout(&self) -> i64 {
385 match self {
386 Self::Authenticated { opts, .. } => opts.timeout,
387 Self::Tokenless { timeout, .. } => *timeout,
388 }
389 }
390
391 fn since_time(&self) -> Option<i64> {
392 match self {
393 Self::Authenticated { opts, .. } => opts.since_time,
394 Self::Tokenless { since_time, .. } => *since_time,
395 }
396 }
397
398 async fn fetch(
399 &self,
400 params: &[(&str, String)],
401 ) -> Result<crate::http::HttpResponse, AudDError> {
402 match self {
403 Self::Authenticated {
404 http, url, policy, ..
405 } => {
406 let url = url.clone();
407 let policy = *policy;
408 let http = http.clone();
409 let params: Vec<(&str, String)> = params.iter().map(|(k, v)| (*k, v.clone())).collect();
410 retry_async(
411 || {
412 let http = http.clone();
413 let url = url.clone();
414 let params = params.clone();
415 async move { http.get(&url, ¶ms, None).await }
416 },
417 policy,
418 )
419 .await
420 }
421 Self::Tokenless {
422 http, url, policy, ..
423 } => {
424 let url = url.clone();
425 let policy = *policy;
426 let http = http.clone();
427 let params: Vec<(&str, String)> = params.iter().map(|(k, v)| (*k, v.clone())).collect();
428 retry_async(
429 || {
430 let http = http.clone();
431 let url = url.clone();
432 let params = params.clone();
433 async move { http.get(&url, ¶ms).await }
434 },
435 policy,
436 )
437 .await
438 }
439 }
440 }
441}
442
443pub(crate) fn spawn_longpoll(driver: LongpollDriver) -> LongpollPoll {
445 let (match_tx, match_rx) = mpsc::channel::<StreamCallbackMatch>(CHANNEL_BUFFER);
446 let (notif_tx, notif_rx) = mpsc::channel::<StreamCallbackNotification>(CHANNEL_BUFFER);
447 let (err_tx, err_rx) = mpsc::channel::<AudDError>(1);
448 let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
449
450 let join = tokio::spawn(run_longpoll(
451 driver,
452 match_tx,
453 notif_tx,
454 err_tx,
455 shutdown_rx,
456 ));
457
458 LongpollPoll {
459 matches: Box::pin(channel_stream(match_rx)),
460 notifications: Box::pin(channel_stream(notif_rx)),
461 errors: Box::pin(channel_stream(err_rx)),
462 shutdown: Some(shutdown_tx),
463 join: Some(join),
464 }
465}
466
467fn channel_stream<T: Send + 'static>(mut rx: mpsc::Receiver<T>) -> impl Stream<Item = T> + Send {
468 async_stream::stream! {
469 while let Some(item) = rx.recv().await {
470 yield item;
471 }
472 }
473}
474
475async fn run_longpoll(
479 driver: LongpollDriver,
480 match_tx: mpsc::Sender<StreamCallbackMatch>,
481 notif_tx: mpsc::Sender<StreamCallbackNotification>,
482 err_tx: mpsc::Sender<AudDError>,
483 mut shutdown_rx: mpsc::Receiver<()>,
484) {
485 let mut cur_since = driver.since_time();
486 let timeout_secs = driver.timeout().to_string();
487 let category = driver.category().to_string();
488
489 loop {
490 let mut params: Vec<(&str, String)> = vec![
492 ("category", category.clone()),
493 ("timeout", timeout_secs.clone()),
494 ];
495 if let Some(t) = cur_since {
496 params.push(("since_time", t.to_string()));
497 }
498
499 let resp = tokio::select! {
502 biased;
503 _ = shutdown_rx.recv() => return,
504 r = driver.fetch(¶ms) => r,
505 };
506
507 let resp = match resp {
508 Ok(r) => r,
509 Err(e) => {
510 let _ = err_tx.send(e).await;
511 return;
512 }
513 };
514
515 if resp.http_status >= HTTP_CLIENT_ERROR_FLOOR {
517 let _ = err_tx
518 .send(AudDError::Server {
519 http_status: resp.http_status,
520 message: format!("Longpoll endpoint returned HTTP {}", resp.http_status),
521 request_id: resp.request_id,
522 raw_response: resp.raw_text,
523 })
524 .await;
525 return;
526 }
527
528 let Some(body) = resp.json_body else {
529 let _ = err_tx
530 .send(AudDError::Serialization {
531 message: "Longpoll response was not a JSON object".into(),
532 raw_text: resp.raw_text,
533 })
534 .await;
535 return;
536 };
537
538 if is_longpoll_keepalive(&body) {
540 if let Some(ts) = body.get("timestamp").and_then(Value::as_i64) {
541 cur_since = Some(ts);
542 }
543 continue;
544 }
545
546 if let Some(ts) = body.get("timestamp").and_then(Value::as_i64) {
549 cur_since = Some(ts);
550 }
551
552 match parse_callback(body) {
553 Ok(CallbackEvent::Match(m)) => {
554 tokio::select! {
555 biased;
556 _ = shutdown_rx.recv() => return,
557 res = match_tx.send(m) => {
558 if res.is_err() { return; }
559 }
560 }
561 }
562 Ok(CallbackEvent::Notification(n)) => {
563 tokio::select! {
564 biased;
565 _ = shutdown_rx.recv() => return,
566 res = notif_tx.send(n) => {
567 if res.is_err() { return; }
568 }
569 }
570 }
571 Err(e) => {
572 let _ = err_tx.send(e).await;
573 return;
574 }
575 }
576 }
577}
578
579pub(crate) fn is_longpoll_keepalive(body: &Value) -> bool {
584 let Some(obj) = body.as_object() else {
585 return false;
586 };
587 if obj.contains_key("result") || obj.contains_key("notification") {
588 return false;
589 }
590 obj.contains_key("timeout")
591}
592
593async fn post_form(
596 http: &HttpClient,
597 url: &str,
598 fields: &[(&str, String)],
599 policy: RetryPolicy,
600) -> Result<Value, AudDError> {
601 let url = url.to_string();
602 let fields: Vec<(&str, String)> = fields.iter().map(|(k, v)| (*k, v.clone())).collect();
603 let resp = retry_async(
604 || {
605 let http = http.clone();
606 let url = url.clone();
607 let fields = fields.clone();
608 async move { http.post_form(&url, &fields, None, None).await }
609 },
610 policy,
611 )
612 .await?;
613 let body = decode_or_raise(resp, false)?;
614 Ok(body.get("result").cloned().unwrap_or(Value::Null))
615}
616
617#[cfg(test)]
618mod tests {
619 use super::*;
620
621 #[test]
622 fn longpoll_options_default() {
623 let o = LongpollOptions::default();
624 assert_eq!(o.timeout, 50);
625 assert!(!o.skip_callback_check);
626 }
627
628 #[test]
629 fn longpoll_options_chain() {
630 let o = LongpollOptions::default()
631 .timeout(30)
632 .since_time(123)
633 .skip_callback_check(true);
634 assert_eq!(o.timeout, 30);
635 assert_eq!(o.since_time, Some(123));
636 assert!(o.skip_callback_check);
637 }
638
639 #[test]
640 fn keepalive_detection() {
641 let kp = serde_json::json!({"timeout": "no events before timeout", "timestamp": 1});
642 assert!(is_longpoll_keepalive(&kp));
643
644 let with_result = serde_json::json!({
645 "result": {"radio_id": 1, "results": []},
646 "timeout": "no events"
647 });
648 assert!(!is_longpoll_keepalive(&with_result));
649
650 let with_notif = serde_json::json!({
651 "notification": {"radio_id": 1},
652 "timeout": "x"
653 });
654 assert!(!is_longpoll_keepalive(&with_notif));
655
656 let no_timeout = serde_json::json!({"timestamp": 1});
657 assert!(!is_longpoll_keepalive(&no_timeout));
658
659 let not_object = serde_json::json!([1, 2, 3]);
660 assert!(!is_longpoll_keepalive(¬_object));
661 }
662}