Skip to main content

audd/
streams.rs

1//! Streams namespace — set/get callback URL, addStream/listStreams/setStreamUrl/deleteStream,
2//! longpoll with default-on preflight (and `skip_callback_check` opt-out),
3//! `derive_longpoll_category`, `parse_callback`.
4
5use std::pin::Pin;
6
7use futures_core::Stream;
8use serde_json::Value;
9use tokio::sync::mpsc;
10use tokio::task::JoinHandle;
11
12use crate::client::{decode_or_raise, AudDInner};
13use crate::errors::{AudDError, ErrorKind};
14use crate::helpers::{add_return_to_url, derive_longpoll_category, parse_callback};
15use crate::http::HttpClient;
16use crate::models::{
17    CallbackEvent, Stream as StreamRow, StreamCallbackMatch, StreamCallbackNotification,
18};
19use crate::retry::{retry_async, RetryPolicy};
20
21/// Server returns error #19 from `getCallbackUrl` when no callback URL is
22/// configured. We treat this specifically as the "no-callback-set" signal.
23const NO_CALLBACK_ERROR_CODE: i32 = 19;
24
25const HTTP_CLIENT_ERROR_FLOOR: u16 = 400;
26
27const PREFLIGHT_NO_CALLBACK_HINT: &str =
28    "Longpoll won't deliver events because no callback URL is configured for this account. \
29Set one first via streams.set_callback_url(...) — `https://audd.tech/empty/` is fine if \
30you only want longpolling and don't need a real receiver. \
31To skip this check, pass skip_callback_check=true.";
32
33/// Channel buffer for each of `matches` / `notifications` / `errors`. Small —
34/// we want to apply backpressure to the poll loop when the consumer is slow.
35const CHANNEL_BUFFER: usize = 16;
36
37/// Boxed stream alias kept inline (avoid pulling `futures_util::stream::BoxStream`
38/// just for the type name).
39type BoxStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;
40
41/// Longpoll subscription configuration. Construct via [`Streams::longpoll`] and
42/// chain builder-style overrides.
43#[derive(Debug, Clone)]
44pub struct LongpollOptions {
45    since_time: Option<i64>,
46    timeout: i64,
47    skip_callback_check: bool,
48}
49
50impl Default for LongpollOptions {
51    fn default() -> Self {
52        Self {
53            since_time: None,
54            timeout: 50,
55            skip_callback_check: false,
56        }
57    }
58}
59
60impl LongpollOptions {
61    /// Set `since_time` (Unix-millis cursor returned by the server).
62    #[must_use]
63    pub fn since_time(mut self, t: i64) -> Self {
64        self.since_time = Some(t);
65        self
66    }
67
68    /// Set the per-request long-poll timeout in seconds (server-side cap; default 50).
69    #[must_use]
70    pub fn timeout(mut self, secs: i64) -> Self {
71        self.timeout = secs;
72        self
73    }
74
75    /// Skip the `getCallbackUrl` preflight check.
76    #[must_use]
77    pub fn skip_callback_check(mut self, skip: bool) -> Self {
78        self.skip_callback_check = skip;
79        self
80    }
81}
82
83/// An active longpoll subscription. Three typed streams surface its output:
84///
85/// * [`Self::matches`] — recognition matches.
86/// * [`Self::notifications`] — stream-lifecycle events.
87/// * [`Self::errors`] — yields a single terminal error then closes; after an
88///   error fires, `matches` and `notifications` close too.
89///
90/// Drop the [`LongpollPoll`] (or call [`Self::close`]) to tear down the
91/// background poller.
92pub struct LongpollPoll {
93    /// Recognition matches.
94    pub matches: BoxStream<StreamCallbackMatch>,
95    /// Stream-lifecycle notifications (e.g. `stream stopped`, `can't connect`).
96    pub notifications: BoxStream<StreamCallbackNotification>,
97    /// Terminal-error stream — yields at most one error and closes.
98    pub errors: BoxStream<AudDError>,
99
100    shutdown: Option<mpsc::Sender<()>>,
101    join: Option<JoinHandle<()>>,
102}
103
104impl std::fmt::Debug for LongpollPoll {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        f.debug_struct("LongpollPoll").finish_non_exhaustive()
107    }
108}
109
110impl LongpollPoll {
111    /// Stop the background poll and wait for it to drain. Idempotent (only
112    /// the first call performs the shutdown; subsequent calls are no-ops).
113    pub async fn close(mut self) {
114        self.close_internal().await;
115    }
116
117    async fn close_internal(&mut self) {
118        // Drop the sender to signal shutdown.
119        self.shutdown.take();
120        if let Some(handle) = self.join.take() {
121            let _ = handle.await;
122        }
123    }
124}
125
126impl Drop for LongpollPoll {
127    fn drop(&mut self) {
128        // On drop, take the sender so the background task exits its loop.
129        // We don't await the join handle — best-effort cleanup. Callers who
130        // want deterministic shutdown should call `close().await` first.
131        self.shutdown.take();
132        if let Some(handle) = self.join.take() {
133            handle.abort();
134        }
135    }
136}
137
138/// Streams namespace. Reach via [`crate::AudD::streams`].
139pub struct Streams<'a> {
140    inner: &'a AudDInner,
141}
142
143impl<'a> Streams<'a> {
144    pub(crate) fn new(inner: &'a AudDInner) -> Self {
145        Self { inner }
146    }
147
148    /// Set the callback URL on the caller's account. If `return_metadata` is
149    /// provided, it's appended as a `?return=...` query parameter to the URL.
150    /// Refuses to silently overwrite an existing `return=` parameter.
151    ///
152    /// # Errors
153    ///
154    /// Returns [`AudDError`] for transport, server, or input-conflict failures.
155    pub async fn set_callback_url(
156        &self,
157        url: &str,
158        return_metadata: Option<&[String]>,
159    ) -> Result<(), AudDError> {
160        let url = add_return_to_url(url, return_metadata)?;
161        post_form(
162            &self.inner.http,
163            &format!("{}/setCallbackUrl/", self.inner.api_base),
164            &[("url", url)],
165            self.inner.mutating_policy(),
166        )
167        .await
168        .map(drop)
169    }
170
171    /// Read the currently-configured callback URL.
172    ///
173    /// # Errors
174    ///
175    /// Returns [`AudDError::Api`] with code 19 if no callback URL is configured.
176    pub async fn get_callback_url(&self) -> Result<String, AudDError> {
177        let result = post_form(
178            &self.inner.http,
179            &format!("{}/getCallbackUrl/", self.inner.api_base),
180            &[],
181            self.inner.read_policy(),
182        )
183        .await?;
184        Ok(result
185            .as_str()
186            .map_or_else(|| result.to_string(), str::to_string))
187    }
188
189    /// Add a stream subscription.
190    ///
191    /// `url` accepts direct stream URLs (DASH, Icecast, HLS, m3u/m3u8) and
192    /// shortcuts like `twitch:<channel>`, `youtube:<video_id>`,
193    /// `youtube-ch:<channel_id>`. Pass `callbacks=Some("before")` to deliver
194    /// callbacks at song start instead of song end.
195    ///
196    /// # Errors
197    ///
198    /// Returns [`AudDError`] for transport/server failures.
199    pub async fn add(
200        &self,
201        url: &str,
202        radio_id: i64,
203        callbacks: Option<&str>,
204    ) -> Result<(), AudDError> {
205        let mut fields: Vec<(&str, String)> =
206            vec![("url", url.to_string()), ("radio_id", radio_id.to_string())];
207        if let Some(cb) = callbacks {
208            fields.push(("callbacks", cb.to_string()));
209        }
210        post_form(
211            &self.inner.http,
212            &format!("{}/addStream/", self.inner.api_base),
213            &fields,
214            self.inner.mutating_policy(),
215        )
216        .await
217        .map(drop)
218    }
219
220    /// Update the URL of an existing stream subscription.
221    ///
222    /// # Errors
223    ///
224    /// Returns [`AudDError`] for transport/server failures.
225    pub async fn set_url(&self, radio_id: i64, url: &str) -> Result<(), AudDError> {
226        post_form(
227            &self.inner.http,
228            &format!("{}/setStreamUrl/", self.inner.api_base),
229            &[("radio_id", radio_id.to_string()), ("url", url.to_string())],
230            self.inner.mutating_policy(),
231        )
232        .await
233        .map(drop)
234    }
235
236    /// Delete a stream subscription.
237    ///
238    /// # Errors
239    ///
240    /// Returns [`AudDError`] for transport/server failures.
241    pub async fn delete(&self, radio_id: i64) -> Result<(), AudDError> {
242        post_form(
243            &self.inner.http,
244            &format!("{}/deleteStream/", self.inner.api_base),
245            &[("radio_id", radio_id.to_string())],
246            self.inner.mutating_policy(),
247        )
248        .await
249        .map(drop)
250    }
251
252    /// List all stream subscriptions on the caller's account.
253    ///
254    /// # Errors
255    ///
256    /// Returns [`AudDError`] for transport/server/parse failures.
257    pub async fn list(&self) -> Result<Vec<StreamRow>, AudDError> {
258        let result = post_form(
259            &self.inner.http,
260            &format!("{}/getStreams/", self.inner.api_base),
261            &[],
262            self.inner.read_policy(),
263        )
264        .await?;
265        if result.is_null() {
266            return Ok(Vec::new());
267        }
268        let v: Vec<StreamRow> =
269            serde_json::from_value(result.clone()).map_err(|e| AudDError::Serialization {
270                message: format!("could not parse getStreams result: {e}"),
271                raw_text: result.to_string(),
272            })?;
273        Ok(v)
274    }
275
276    /// Compute the 9-char longpoll category locally from `(api_token, radio_id)`.
277    /// Pure function — no network call. Snapshots the live api_token, so it
278    /// reflects any prior `AudD::set_api_token` rotation.
279    #[must_use]
280    pub fn derive_longpoll_category(&self, radio_id: i64) -> String {
281        derive_longpoll_category(&self.inner.api_token(), radio_id)
282    }
283
284    /// Parse an already-deserialized callback POST body into a typed
285    /// [`CallbackEvent`].
286    ///
287    /// # Errors
288    ///
289    /// Returns [`AudDError::Serialization`] if the body doesn't deserialize.
290    pub fn parse_callback(&self, body: Value) -> Result<CallbackEvent, AudDError> {
291        parse_callback(body)
292    }
293
294    /// Long-poll the AudD streams endpoint and return a [`LongpollPoll`]
295    /// handle whose typed streams (matches / notifications / errors) are
296    /// fed by a background tokio task.
297    ///
298    /// Server keepalive ticks (`{"timeout": "no events before timeout"}`) are
299    /// silently absorbed — they advance the internal cursor and never reach
300    /// the consumer.
301    ///
302    /// On entry, performs a one-time `getCallbackUrl` preflight unless
303    /// `opts.skip_callback_check == true`. If the server returns error #19
304    /// (no callback URL configured), [`AudDError::Api`] is returned with kind
305    /// [`ErrorKind::InvalidRequest`] explaining how to fix it.
306    ///
307    /// # Errors
308    ///
309    /// Returns [`AudDError`] from the preflight only. Fatal errors during
310    /// polling surface on [`LongpollPoll::errors`].
311    pub async fn longpoll(
312        &self,
313        category: &str,
314        opts: LongpollOptions,
315    ) -> Result<LongpollPoll, AudDError> {
316        if !opts.skip_callback_check {
317            self.preflight_callback().await?;
318        }
319        Ok(spawn_longpoll(LongpollDriver::Authenticated {
320            http: self.inner.http.clone(),
321            url: format!("{}/longpoll/", self.inner.api_base),
322            policy: self.inner.read_policy(),
323            category: category.to_string(),
324            opts,
325        }))
326    }
327
328    async fn preflight_callback(&self) -> Result<(), AudDError> {
329        match self.get_callback_url().await {
330            Ok(_) => Ok(()),
331            Err(e) if e.error_code() == Some(NO_CALLBACK_ERROR_CODE) => {
332                let (http_status, request_id) = match &e {
333                    AudDError::Api {
334                        http_status,
335                        request_id,
336                        ..
337                    } => (*http_status, request_id.clone()),
338                    _ => (0, None),
339                };
340                Err(AudDError::Api {
341                    code: 0,
342                    message: PREFLIGHT_NO_CALLBACK_HINT.to_string(),
343                    kind: ErrorKind::InvalidRequest,
344                    http_status,
345                    request_id,
346                    requested_params: std::collections::HashMap::new(),
347                    request_method: None,
348                    branded_message: None,
349                    raw_response: Value::Null,
350                })
351            }
352            Err(other) => Err(other),
353        }
354    }
355}
356
357/// Internal — describes a single longpoll fetch source so authenticated and
358/// tokenless consumers share the same dispatch loop.
359pub(crate) enum LongpollDriver {
360    Authenticated {
361        http: HttpClient,
362        url: String,
363        policy: RetryPolicy,
364        category: String,
365        opts: LongpollOptions,
366    },
367    Tokenless {
368        http: crate::http::BareHttpClient,
369        url: String,
370        policy: RetryPolicy,
371        category: String,
372        since_time: Option<i64>,
373        timeout: i64,
374    },
375}
376
377impl LongpollDriver {
378    fn category(&self) -> &str {
379        match self {
380            Self::Authenticated { category, .. } | Self::Tokenless { category, .. } => category,
381        }
382    }
383
384    fn timeout(&self) -> i64 {
385        match self {
386            Self::Authenticated { opts, .. } => opts.timeout,
387            Self::Tokenless { timeout, .. } => *timeout,
388        }
389    }
390
391    fn since_time(&self) -> Option<i64> {
392        match self {
393            Self::Authenticated { opts, .. } => opts.since_time,
394            Self::Tokenless { since_time, .. } => *since_time,
395        }
396    }
397
398    async fn fetch(
399        &self,
400        params: &[(&str, String)],
401    ) -> Result<crate::http::HttpResponse, AudDError> {
402        match self {
403            Self::Authenticated {
404                http, url, policy, ..
405            } => {
406                let url = url.clone();
407                let policy = *policy;
408                let http = http.clone();
409                let params: Vec<(&str, String)> = params.iter().map(|(k, v)| (*k, v.clone())).collect();
410                retry_async(
411                    || {
412                        let http = http.clone();
413                        let url = url.clone();
414                        let params = params.clone();
415                        async move { http.get(&url, &params, None).await }
416                    },
417                    policy,
418                )
419                .await
420            }
421            Self::Tokenless {
422                http, url, policy, ..
423            } => {
424                let url = url.clone();
425                let policy = *policy;
426                let http = http.clone();
427                let params: Vec<(&str, String)> = params.iter().map(|(k, v)| (*k, v.clone())).collect();
428                retry_async(
429                    || {
430                        let http = http.clone();
431                        let url = url.clone();
432                        let params = params.clone();
433                        async move { http.get(&url, &params).await }
434                    },
435                    policy,
436                )
437                .await
438            }
439        }
440    }
441}
442
443/// Spawn the background poll task and wire up the three streams.
444pub(crate) fn spawn_longpoll(driver: LongpollDriver) -> LongpollPoll {
445    let (match_tx, match_rx) = mpsc::channel::<StreamCallbackMatch>(CHANNEL_BUFFER);
446    let (notif_tx, notif_rx) = mpsc::channel::<StreamCallbackNotification>(CHANNEL_BUFFER);
447    let (err_tx, err_rx) = mpsc::channel::<AudDError>(1);
448    let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
449
450    let join = tokio::spawn(run_longpoll(
451        driver,
452        match_tx,
453        notif_tx,
454        err_tx,
455        shutdown_rx,
456    ));
457
458    LongpollPoll {
459        matches: Box::pin(channel_stream(match_rx)),
460        notifications: Box::pin(channel_stream(notif_rx)),
461        errors: Box::pin(channel_stream(err_rx)),
462        shutdown: Some(shutdown_tx),
463        join: Some(join),
464    }
465}
466
467fn channel_stream<T: Send + 'static>(mut rx: mpsc::Receiver<T>) -> impl Stream<Item = T> + Send {
468    async_stream::stream! {
469        while let Some(item) = rx.recv().await {
470            yield item;
471        }
472    }
473}
474
475/// Drive a single longpoll subscription: read responses, parse them, and
476/// dispatch to the typed channels. Exits when the shutdown signal is dropped,
477/// a fatal error fires, or all channels are closed.
478async fn run_longpoll(
479    driver: LongpollDriver,
480    match_tx: mpsc::Sender<StreamCallbackMatch>,
481    notif_tx: mpsc::Sender<StreamCallbackNotification>,
482    err_tx: mpsc::Sender<AudDError>,
483    mut shutdown_rx: mpsc::Receiver<()>,
484) {
485    let mut cur_since = driver.since_time();
486    let timeout_secs = driver.timeout().to_string();
487    let category = driver.category().to_string();
488
489    loop {
490        // Build params each iteration so since_time updates flow through.
491        let mut params: Vec<(&str, String)> = vec![
492            ("category", category.clone()),
493            ("timeout", timeout_secs.clone()),
494        ];
495        if let Some(t) = cur_since {
496            params.push(("since_time", t.to_string()));
497        }
498
499        // Race the fetch against shutdown so a quiescent caller can hang up
500        // even mid-poll.
501        let resp = tokio::select! {
502            biased;
503            _ = shutdown_rx.recv() => return,
504            r = driver.fetch(&params) => r,
505        };
506
507        let resp = match resp {
508            Ok(r) => r,
509            Err(e) => {
510                let _ = err_tx.send(e).await;
511                return;
512            }
513        };
514
515        // Surface non-2xx as a Server error.
516        if resp.http_status >= HTTP_CLIENT_ERROR_FLOOR {
517            let _ = err_tx
518                .send(AudDError::Server {
519                    http_status: resp.http_status,
520                    message: format!("Longpoll endpoint returned HTTP {}", resp.http_status),
521                    request_id: resp.request_id,
522                    raw_response: resp.raw_text,
523                })
524                .await;
525            return;
526        }
527
528        let Some(body) = resp.json_body else {
529            let _ = err_tx
530                .send(AudDError::Serialization {
531                    message: "Longpoll response was not a JSON object".into(),
532                    raw_text: resp.raw_text,
533                })
534                .await;
535            return;
536        };
537
538        // Silently absorb keepalive ticks.
539        if is_longpoll_keepalive(&body) {
540            if let Some(ts) = body.get("timestamp").and_then(Value::as_i64) {
541                cur_since = Some(ts);
542            }
543            continue;
544        }
545
546        // Advance the cursor before parsing — even if parsing fails, we don't
547        // want to re-poll the same window.
548        if let Some(ts) = body.get("timestamp").and_then(Value::as_i64) {
549            cur_since = Some(ts);
550        }
551
552        match parse_callback(body) {
553            Ok(CallbackEvent::Match(m)) => {
554                tokio::select! {
555                    biased;
556                    _ = shutdown_rx.recv() => return,
557                    res = match_tx.send(m) => {
558                        if res.is_err() { return; }
559                    }
560                }
561            }
562            Ok(CallbackEvent::Notification(n)) => {
563                tokio::select! {
564                    biased;
565                    _ = shutdown_rx.recv() => return,
566                    res = notif_tx.send(n) => {
567                        if res.is_err() { return; }
568                    }
569                }
570            }
571            Err(e) => {
572                let _ = err_tx.send(e).await;
573                return;
574            }
575        }
576    }
577}
578
579/// Reports whether `body` is a `{"timeout": "no events before timeout"}`
580/// keepalive tick — the server emits one of these every `<timeout>` seconds
581/// when no recognition or notification is queued. Mirrors audd-go's
582/// `isLongpollKeepalive` helper.
583pub(crate) fn is_longpoll_keepalive(body: &Value) -> bool {
584    let Some(obj) = body.as_object() else {
585        return false;
586    };
587    if obj.contains_key("result") || obj.contains_key("notification") {
588        return false;
589    }
590    obj.contains_key("timeout")
591}
592
593/// Internal — POST a form body to a streams-namespace endpoint and return the
594/// `result` field on success.
595async fn post_form(
596    http: &HttpClient,
597    url: &str,
598    fields: &[(&str, String)],
599    policy: RetryPolicy,
600) -> Result<Value, AudDError> {
601    let url = url.to_string();
602    let fields: Vec<(&str, String)> = fields.iter().map(|(k, v)| (*k, v.clone())).collect();
603    let resp = retry_async(
604        || {
605            let http = http.clone();
606            let url = url.clone();
607            let fields = fields.clone();
608            async move { http.post_form(&url, &fields, None, None).await }
609        },
610        policy,
611    )
612    .await?;
613    let body = decode_or_raise(resp, false)?;
614    Ok(body.get("result").cloned().unwrap_or(Value::Null))
615}
616
617#[cfg(test)]
618mod tests {
619    use super::*;
620
621    #[test]
622    fn longpoll_options_default() {
623        let o = LongpollOptions::default();
624        assert_eq!(o.timeout, 50);
625        assert!(!o.skip_callback_check);
626    }
627
628    #[test]
629    fn longpoll_options_chain() {
630        let o = LongpollOptions::default()
631            .timeout(30)
632            .since_time(123)
633            .skip_callback_check(true);
634        assert_eq!(o.timeout, 30);
635        assert_eq!(o.since_time, Some(123));
636        assert!(o.skip_callback_check);
637    }
638
639    #[test]
640    fn keepalive_detection() {
641        let kp = serde_json::json!({"timeout": "no events before timeout", "timestamp": 1});
642        assert!(is_longpoll_keepalive(&kp));
643
644        let with_result = serde_json::json!({
645            "result": {"radio_id": 1, "results": []},
646            "timeout": "no events"
647        });
648        assert!(!is_longpoll_keepalive(&with_result));
649
650        let with_notif = serde_json::json!({
651            "notification": {"radio_id": 1},
652            "timeout": "x"
653        });
654        assert!(!is_longpoll_keepalive(&with_notif));
655
656        let no_timeout = serde_json::json!({"timestamp": 1});
657        assert!(!is_longpoll_keepalive(&no_timeout));
658
659        let not_object = serde_json::json!([1, 2, 3]);
660        assert!(!is_longpoll_keepalive(&not_object));
661    }
662}