1use std::collections::HashMap;
6use std::pin::Pin;
7
8use futures_core::Stream;
9use serde_json::Value;
10use tokio::sync::mpsc;
11use tokio::task::JoinHandle;
12
13use crate::client::{decode_or_raise, AudDInner};
14use crate::errors::{AudDError, ErrorKind};
15use crate::helpers::{add_return_to_url, derive_longpoll_category, parse_callback};
16use crate::http::HttpClient;
17use crate::models::{
18 CallbackEvent, Stream as StreamRow, StreamCallbackMatch, StreamCallbackNotification,
19};
20use crate::retry::{retry_async, RetryPolicy};
21
22const NO_CALLBACK_ERROR_CODE: i32 = 19;
25
26const HTTP_CLIENT_ERROR_FLOOR: u16 = 400;
27
28const PREFLIGHT_NO_CALLBACK_HINT: &str =
29 "Longpoll won't deliver events because no callback URL is configured for this account. \
30Set one first via streams.set_callback_url(...) — `https://audd.tech/empty/` is fine if \
31you only want longpolling and don't need a real receiver. \
32To skip this check, pass skip_callback_check=true.";
33
34const CHANNEL_BUFFER: usize = 16;
37
38type BoxStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;
41
42#[derive(Debug, Clone)]
45pub struct LongpollOptions {
46 since_time: Option<i64>,
47 timeout: i64,
48 skip_callback_check: bool,
49}
50
51impl Default for LongpollOptions {
52 fn default() -> Self {
53 Self {
54 since_time: None,
55 timeout: 50,
56 skip_callback_check: false,
57 }
58 }
59}
60
61impl LongpollOptions {
62 #[must_use]
64 pub fn since_time(mut self, t: i64) -> Self {
65 self.since_time = Some(t);
66 self
67 }
68
69 #[must_use]
71 pub fn timeout(mut self, secs: i64) -> Self {
72 self.timeout = secs;
73 self
74 }
75
76 #[must_use]
78 pub fn skip_callback_check(mut self, skip: bool) -> Self {
79 self.skip_callback_check = skip;
80 self
81 }
82}
83
84pub struct LongpollPoll {
94 pub matches: BoxStream<StreamCallbackMatch>,
96 pub notifications: BoxStream<StreamCallbackNotification>,
98 pub errors: BoxStream<AudDError>,
100
101 shutdown: Option<mpsc::Sender<()>>,
102 join: Option<JoinHandle<()>>,
103}
104
105impl std::fmt::Debug for LongpollPoll {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 f.debug_struct("LongpollPoll").finish_non_exhaustive()
108 }
109}
110
111impl LongpollPoll {
112 pub async fn close(mut self) {
115 self.close_internal().await;
116 }
117
118 async fn close_internal(&mut self) {
119 self.shutdown.take();
121 if let Some(handle) = self.join.take() {
122 let _ = handle.await;
123 }
124 }
125}
126
127impl Drop for LongpollPoll {
128 fn drop(&mut self) {
129 self.shutdown.take();
133 if let Some(handle) = self.join.take() {
134 handle.abort();
135 }
136 }
137}
138
139pub struct Streams<'a> {
141 inner: &'a AudDInner,
142}
143
144impl<'a> Streams<'a> {
145 pub(crate) fn new(inner: &'a AudDInner) -> Self {
146 Self { inner }
147 }
148
149 pub async fn set_callback_url(
157 &self,
158 url: &str,
159 return_metadata: Option<&[String]>,
160 extra_parameters: Option<&HashMap<String, String>>,
161 ) -> Result<(), AudDError> {
162 let url = add_return_to_url(url, return_metadata)?;
163 let mut fields: Vec<(&str, String)> = Vec::new();
165 if let Some(extras) = extra_parameters {
166 for (k, v) in extras {
167 fields.push((k.as_str(), v.clone()));
168 }
169 }
170 fields.push(("url", url));
171 post_form(
172 &self.inner.http,
173 &format!("{}/setCallbackUrl/", self.inner.api_base),
174 &fields,
175 self.inner.mutating_policy(),
176 )
177 .await
178 .map(drop)
179 }
180
181 pub async fn get_callback_url(&self) -> Result<String, AudDError> {
187 let result = post_form(
188 &self.inner.http,
189 &format!("{}/getCallbackUrl/", self.inner.api_base),
190 &[],
191 self.inner.read_policy(),
192 )
193 .await?;
194 Ok(result
195 .as_str()
196 .map_or_else(|| result.to_string(), str::to_string))
197 }
198
199 pub async fn add(
210 &self,
211 url: &str,
212 radio_id: i64,
213 callbacks: Option<&str>,
214 extra_parameters: Option<&HashMap<String, String>>,
215 ) -> Result<(), AudDError> {
216 let mut fields: Vec<(&str, String)> = Vec::new();
218 if let Some(extras) = extra_parameters {
219 for (k, v) in extras {
220 fields.push((k.as_str(), v.clone()));
221 }
222 }
223 fields.push(("url", url.to_string()));
224 fields.push(("radio_id", radio_id.to_string()));
225 if let Some(cb) = callbacks {
226 fields.push(("callbacks", cb.to_string()));
227 }
228 post_form(
229 &self.inner.http,
230 &format!("{}/addStream/", self.inner.api_base),
231 &fields,
232 self.inner.mutating_policy(),
233 )
234 .await
235 .map(drop)
236 }
237
238 pub async fn set_url(&self, radio_id: i64, url: &str) -> Result<(), AudDError> {
244 post_form(
245 &self.inner.http,
246 &format!("{}/setStreamUrl/", self.inner.api_base),
247 &[("radio_id", radio_id.to_string()), ("url", url.to_string())],
248 self.inner.mutating_policy(),
249 )
250 .await
251 .map(drop)
252 }
253
254 pub async fn delete(&self, radio_id: i64) -> Result<(), AudDError> {
260 post_form(
261 &self.inner.http,
262 &format!("{}/deleteStream/", self.inner.api_base),
263 &[("radio_id", radio_id.to_string())],
264 self.inner.mutating_policy(),
265 )
266 .await
267 .map(drop)
268 }
269
270 pub async fn list(&self) -> Result<Vec<StreamRow>, AudDError> {
276 let result = post_form(
277 &self.inner.http,
278 &format!("{}/getStreams/", self.inner.api_base),
279 &[],
280 self.inner.read_policy(),
281 )
282 .await?;
283 if result.is_null() {
284 return Ok(Vec::new());
285 }
286 let v: Vec<StreamRow> =
287 serde_json::from_value(result.clone()).map_err(|e| AudDError::Serialization {
288 message: format!("could not parse getStreams result: {e}"),
289 raw_text: result.to_string(),
290 })?;
291 Ok(v)
292 }
293
294 #[must_use]
298 pub fn derive_longpoll_category(&self, radio_id: i64) -> String {
299 derive_longpoll_category(&self.inner.api_token(), radio_id)
300 }
301
302 pub fn parse_callback(&self, body: Value) -> Result<CallbackEvent, AudDError> {
309 parse_callback(body)
310 }
311
312 pub async fn longpoll(
330 &self,
331 category: &str,
332 opts: LongpollOptions,
333 ) -> Result<LongpollPoll, AudDError> {
334 if !opts.skip_callback_check {
335 self.preflight_callback().await?;
336 }
337 Ok(spawn_longpoll(LongpollDriver::Authenticated {
338 http: self.inner.http.clone(),
339 url: format!("{}/longpoll/", self.inner.api_base),
340 policy: self.inner.read_policy(),
341 category: category.to_string(),
342 opts,
343 }))
344 }
345
346 pub async fn longpoll_by_radio_id(
357 &self,
358 radio_id: i64,
359 opts: LongpollOptions,
360 ) -> Result<LongpollPoll, AudDError> {
361 let category = self.derive_longpoll_category(radio_id);
362 self.longpoll(&category, opts).await
363 }
364
365 async fn preflight_callback(&self) -> Result<(), AudDError> {
366 match self.get_callback_url().await {
367 Ok(_) => Ok(()),
368 Err(e) if e.error_code() == Some(NO_CALLBACK_ERROR_CODE) => {
369 let (http_status, request_id) = match &e {
370 AudDError::Api {
371 http_status,
372 request_id,
373 ..
374 } => (*http_status, request_id.clone()),
375 _ => (0, None),
376 };
377 Err(AudDError::Api {
378 code: 0,
379 message: PREFLIGHT_NO_CALLBACK_HINT.to_string(),
380 kind: ErrorKind::InvalidRequest,
381 http_status,
382 request_id,
383 requested_params: std::collections::HashMap::new(),
384 request_method: None,
385 branded_message: None,
386 raw_response: Value::Null,
387 })
388 }
389 Err(other) => Err(other),
390 }
391 }
392}
393
394pub(crate) enum LongpollDriver {
397 Authenticated {
398 http: HttpClient,
399 url: String,
400 policy: RetryPolicy,
401 category: String,
402 opts: LongpollOptions,
403 },
404 Tokenless {
405 http: crate::http::BareHttpClient,
406 url: String,
407 policy: RetryPolicy,
408 category: String,
409 since_time: Option<i64>,
410 timeout: i64,
411 },
412}
413
414impl LongpollDriver {
415 fn category(&self) -> &str {
416 match self {
417 Self::Authenticated { category, .. } | Self::Tokenless { category, .. } => category,
418 }
419 }
420
421 fn timeout(&self) -> i64 {
422 match self {
423 Self::Authenticated { opts, .. } => opts.timeout,
424 Self::Tokenless { timeout, .. } => *timeout,
425 }
426 }
427
428 fn since_time(&self) -> Option<i64> {
429 match self {
430 Self::Authenticated { opts, .. } => opts.since_time,
431 Self::Tokenless { since_time, .. } => *since_time,
432 }
433 }
434
435 async fn fetch(
436 &self,
437 params: &[(&str, String)],
438 ) -> Result<crate::http::HttpResponse, AudDError> {
439 match self {
440 Self::Authenticated {
441 http, url, policy, ..
442 } => {
443 let url = url.clone();
444 let policy = *policy;
445 let http = http.clone();
446 let params: Vec<(&str, String)> =
447 params.iter().map(|(k, v)| (*k, v.clone())).collect();
448 retry_async(
449 || {
450 let http = http.clone();
451 let url = url.clone();
452 let params = params.clone();
453 async move { http.get(&url, ¶ms, None).await }
454 },
455 policy,
456 )
457 .await
458 }
459 Self::Tokenless {
460 http, url, policy, ..
461 } => {
462 let url = url.clone();
463 let policy = *policy;
464 let http = http.clone();
465 let params: Vec<(&str, String)> =
466 params.iter().map(|(k, v)| (*k, v.clone())).collect();
467 retry_async(
468 || {
469 let http = http.clone();
470 let url = url.clone();
471 let params = params.clone();
472 async move { http.get(&url, ¶ms).await }
473 },
474 policy,
475 )
476 .await
477 }
478 }
479 }
480}
481
482pub(crate) fn spawn_longpoll(driver: LongpollDriver) -> LongpollPoll {
484 let (match_tx, match_rx) = mpsc::channel::<StreamCallbackMatch>(CHANNEL_BUFFER);
485 let (notif_tx, notif_rx) = mpsc::channel::<StreamCallbackNotification>(CHANNEL_BUFFER);
486 let (err_tx, err_rx) = mpsc::channel::<AudDError>(1);
487 let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
488
489 let join = tokio::spawn(run_longpoll(
490 driver,
491 match_tx,
492 notif_tx,
493 err_tx,
494 shutdown_rx,
495 ));
496
497 LongpollPoll {
498 matches: Box::pin(channel_stream(match_rx)),
499 notifications: Box::pin(channel_stream(notif_rx)),
500 errors: Box::pin(channel_stream(err_rx)),
501 shutdown: Some(shutdown_tx),
502 join: Some(join),
503 }
504}
505
506fn channel_stream<T: Send + 'static>(mut rx: mpsc::Receiver<T>) -> impl Stream<Item = T> + Send {
507 async_stream::stream! {
508 while let Some(item) = rx.recv().await {
509 yield item;
510 }
511 }
512}
513
514async fn run_longpoll(
518 driver: LongpollDriver,
519 match_tx: mpsc::Sender<StreamCallbackMatch>,
520 notif_tx: mpsc::Sender<StreamCallbackNotification>,
521 err_tx: mpsc::Sender<AudDError>,
522 mut shutdown_rx: mpsc::Receiver<()>,
523) {
524 let mut cur_since = driver.since_time();
525 let timeout_secs = driver.timeout().to_string();
526 let category = driver.category().to_string();
527
528 loop {
529 let mut params: Vec<(&str, String)> = vec![
531 ("category", category.clone()),
532 ("timeout", timeout_secs.clone()),
533 ];
534 if let Some(t) = cur_since {
535 params.push(("since_time", t.to_string()));
536 }
537
538 let resp = tokio::select! {
541 biased;
542 _ = shutdown_rx.recv() => return,
543 r = driver.fetch(¶ms) => r,
544 };
545
546 let resp = match resp {
547 Ok(r) => r,
548 Err(e) => {
549 let _ = err_tx.send(e).await;
550 return;
551 }
552 };
553
554 if resp.http_status >= HTTP_CLIENT_ERROR_FLOOR {
556 let _ = err_tx
557 .send(AudDError::Server {
558 http_status: resp.http_status,
559 message: format!("Longpoll endpoint returned HTTP {}", resp.http_status),
560 request_id: resp.request_id,
561 raw_response: resp.raw_text,
562 })
563 .await;
564 return;
565 }
566
567 let Some(body) = resp.json_body else {
568 let _ = err_tx
569 .send(AudDError::Serialization {
570 message: "Longpoll response was not a JSON object".into(),
571 raw_text: resp.raw_text,
572 })
573 .await;
574 return;
575 };
576
577 if is_longpoll_keepalive(&body) {
579 if let Some(ts) = body.get("timestamp").and_then(Value::as_i64) {
580 cur_since = Some(ts);
581 }
582 continue;
583 }
584
585 if let Some(ts) = body.get("timestamp").and_then(Value::as_i64) {
588 cur_since = Some(ts);
589 }
590
591 match parse_callback(body) {
592 Ok(CallbackEvent::Match(m)) => {
593 tokio::select! {
594 biased;
595 _ = shutdown_rx.recv() => return,
596 res = match_tx.send(m) => {
597 if res.is_err() { return; }
598 }
599 }
600 }
601 Ok(CallbackEvent::Notification(n)) => {
602 tokio::select! {
603 biased;
604 _ = shutdown_rx.recv() => return,
605 res = notif_tx.send(n) => {
606 if res.is_err() { return; }
607 }
608 }
609 }
610 Err(e) => {
611 let _ = err_tx.send(e).await;
612 return;
613 }
614 }
615 }
616}
617
618pub(crate) fn is_longpoll_keepalive(body: &Value) -> bool {
623 let Some(obj) = body.as_object() else {
624 return false;
625 };
626 if obj.contains_key("result") || obj.contains_key("notification") {
627 return false;
628 }
629 obj.contains_key("timeout")
630}
631
632async fn post_form(
635 http: &HttpClient,
636 url: &str,
637 fields: &[(&str, String)],
638 policy: RetryPolicy,
639) -> Result<Value, AudDError> {
640 let url = url.to_string();
641 let fields: Vec<(&str, String)> = fields.iter().map(|(k, v)| (*k, v.clone())).collect();
642 let resp = retry_async(
643 || {
644 let http = http.clone();
645 let url = url.clone();
646 let fields = fields.clone();
647 async move { http.post_form(&url, &fields, None, None).await }
648 },
649 policy,
650 )
651 .await?;
652 let body = decode_or_raise(resp, false)?;
653 Ok(body.get("result").cloned().unwrap_or(Value::Null))
654}
655
656#[cfg(test)]
657mod tests {
658 use super::*;
659
660 #[test]
661 fn longpoll_options_default() {
662 let o = LongpollOptions::default();
663 assert_eq!(o.timeout, 50);
664 assert!(!o.skip_callback_check);
665 }
666
667 #[test]
668 fn longpoll_options_chain() {
669 let o = LongpollOptions::default()
670 .timeout(30)
671 .since_time(123)
672 .skip_callback_check(true);
673 assert_eq!(o.timeout, 30);
674 assert_eq!(o.since_time, Some(123));
675 assert!(o.skip_callback_check);
676 }
677
678 #[test]
679 fn keepalive_detection() {
680 let kp = serde_json::json!({"timeout": "no events before timeout", "timestamp": 1});
681 assert!(is_longpoll_keepalive(&kp));
682
683 let with_result = serde_json::json!({
684 "result": {"radio_id": 1, "results": []},
685 "timeout": "no events"
686 });
687 assert!(!is_longpoll_keepalive(&with_result));
688
689 let with_notif = serde_json::json!({
690 "notification": {"radio_id": 1},
691 "timeout": "x"
692 });
693 assert!(!is_longpoll_keepalive(&with_notif));
694
695 let no_timeout = serde_json::json!({"timestamp": 1});
696 assert!(!is_longpoll_keepalive(&no_timeout));
697
698 let not_object = serde_json::json!([1, 2, 3]);
699 assert!(!is_longpoll_keepalive(¬_object));
700 }
701}