Skip to main content

audd/
streams.rs

1//! Streams namespace — set/get callback URL, addStream/listStreams/setStreamUrl/deleteStream,
2//! longpoll with default-on preflight (and `skip_callback_check` opt-out),
3//! `derive_longpoll_category`, `parse_callback`.
4
5use std::collections::HashMap;
6use std::pin::Pin;
7
8use futures_core::Stream;
9use serde_json::Value;
10use tokio::sync::mpsc;
11use tokio::task::JoinHandle;
12
13use crate::client::{decode_or_raise, AudDInner};
14use crate::errors::{AudDError, ErrorKind};
15use crate::helpers::{add_return_to_url, derive_longpoll_category, parse_callback};
16use crate::http::HttpClient;
17use crate::models::{
18    CallbackEvent, Stream as StreamRow, StreamCallbackMatch, StreamCallbackNotification,
19};
20use crate::retry::{retry_async, RetryPolicy};
21
22/// Server returns error #19 from `getCallbackUrl` when no callback URL is
23/// configured. We treat this specifically as the "no-callback-set" signal.
24const NO_CALLBACK_ERROR_CODE: i32 = 19;
25
26const HTTP_CLIENT_ERROR_FLOOR: u16 = 400;
27
28const PREFLIGHT_NO_CALLBACK_HINT: &str =
29    "Longpoll won't deliver events because no callback URL is configured for this account. \
30Set one first via streams.set_callback_url(...) — `https://audd.tech/empty/` is fine if \
31you only want longpolling and don't need a real receiver. \
32To skip this check, pass skip_callback_check=true.";
33
34/// Channel buffer for each of `matches` / `notifications` / `errors`. Small —
35/// we want to apply backpressure to the poll loop when the consumer is slow.
36const CHANNEL_BUFFER: usize = 16;
37
38/// Boxed stream alias kept inline (avoid pulling `futures_util::stream::BoxStream`
39/// just for the type name).
40type BoxStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;
41
42/// Longpoll subscription configuration. Construct via [`Streams::longpoll`] and
43/// chain builder-style overrides.
44#[derive(Debug, Clone)]
45pub struct LongpollOptions {
46    since_time: Option<i64>,
47    timeout: i64,
48    skip_callback_check: bool,
49}
50
51impl Default for LongpollOptions {
52    fn default() -> Self {
53        Self {
54            since_time: None,
55            timeout: 50,
56            skip_callback_check: false,
57        }
58    }
59}
60
61impl LongpollOptions {
62    /// Set `since_time` (Unix-millis cursor returned by the server).
63    #[must_use]
64    pub fn since_time(mut self, t: i64) -> Self {
65        self.since_time = Some(t);
66        self
67    }
68
69    /// Set the per-request long-poll timeout in seconds (server-side cap; default 50).
70    #[must_use]
71    pub fn timeout(mut self, secs: i64) -> Self {
72        self.timeout = secs;
73        self
74    }
75
76    /// Skip the `getCallbackUrl` preflight check.
77    #[must_use]
78    pub fn skip_callback_check(mut self, skip: bool) -> Self {
79        self.skip_callback_check = skip;
80        self
81    }
82}
83
84/// An active longpoll subscription. Three typed streams surface its output:
85///
86/// * [`Self::matches`] — recognition matches.
87/// * [`Self::notifications`] — stream-lifecycle events.
88/// * [`Self::errors`] — yields a single terminal error then closes; after an
89///   error fires, `matches` and `notifications` close too.
90///
91/// Drop the [`LongpollPoll`] (or call [`Self::close`]) to tear down the
92/// background poller.
93pub struct LongpollPoll {
94    /// Recognition matches.
95    pub matches: BoxStream<StreamCallbackMatch>,
96    /// Stream-lifecycle notifications (e.g. `stream stopped`, `can't connect`).
97    pub notifications: BoxStream<StreamCallbackNotification>,
98    /// Terminal-error stream — yields at most one error and closes.
99    pub errors: BoxStream<AudDError>,
100
101    shutdown: Option<mpsc::Sender<()>>,
102    join: Option<JoinHandle<()>>,
103}
104
105impl std::fmt::Debug for LongpollPoll {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        f.debug_struct("LongpollPoll").finish_non_exhaustive()
108    }
109}
110
111impl LongpollPoll {
112    /// Stop the background poll and wait for it to drain. Idempotent (only
113    /// the first call performs the shutdown; subsequent calls are no-ops).
114    pub async fn close(mut self) {
115        self.close_internal().await;
116    }
117
118    async fn close_internal(&mut self) {
119        // Drop the sender to signal shutdown.
120        self.shutdown.take();
121        if let Some(handle) = self.join.take() {
122            let _ = handle.await;
123        }
124    }
125}
126
127impl Drop for LongpollPoll {
128    fn drop(&mut self) {
129        // On drop, take the sender so the background task exits its loop.
130        // We don't await the join handle — best-effort cleanup. Callers who
131        // want deterministic shutdown should call `close().await` first.
132        self.shutdown.take();
133        if let Some(handle) = self.join.take() {
134            handle.abort();
135        }
136    }
137}
138
139/// Streams namespace. Reach via [`crate::AudD::streams`].
140pub struct Streams<'a> {
141    inner: &'a AudDInner,
142}
143
144impl<'a> Streams<'a> {
145    pub(crate) fn new(inner: &'a AudDInner) -> Self {
146        Self { inner }
147    }
148
149    /// Set the callback URL on the caller's account. If `return_metadata` is
150    /// provided, it's appended as a `?return=...` query parameter to the URL.
151    /// Refuses to silently overwrite an existing `return=` parameter.
152    ///
153    /// # Errors
154    ///
155    /// Returns [`AudDError`] for transport, server, or input-conflict failures.
156    pub async fn set_callback_url(
157        &self,
158        url: &str,
159        return_metadata: Option<&[String]>,
160        extra_parameters: Option<&HashMap<String, String>>,
161    ) -> Result<(), AudDError> {
162        let url = add_return_to_url(url, return_metadata)?;
163        // extra_parameters first; typed fields (`url`) win on collision.
164        let mut fields: Vec<(&str, String)> = Vec::new();
165        if let Some(extras) = extra_parameters {
166            for (k, v) in extras {
167                fields.push((k.as_str(), v.clone()));
168            }
169        }
170        fields.push(("url", url));
171        post_form(
172            &self.inner.http,
173            &format!("{}/setCallbackUrl/", self.inner.api_base),
174            &fields,
175            self.inner.mutating_policy(),
176        )
177        .await
178        .map(drop)
179    }
180
181    /// Read the currently-configured callback URL.
182    ///
183    /// # Errors
184    ///
185    /// Returns [`AudDError::Api`] with code 19 if no callback URL is configured.
186    pub async fn get_callback_url(&self) -> Result<String, AudDError> {
187        let result = post_form(
188            &self.inner.http,
189            &format!("{}/getCallbackUrl/", self.inner.api_base),
190            &[],
191            self.inner.read_policy(),
192        )
193        .await?;
194        Ok(result
195            .as_str()
196            .map_or_else(|| result.to_string(), str::to_string))
197    }
198
199    /// Add a stream subscription.
200    ///
201    /// `url` accepts direct stream URLs (DASH, Icecast, HLS, m3u/m3u8) and
202    /// shortcuts like `twitch:<channel>`, `youtube:<video_id>`,
203    /// `youtube-ch:<channel_id>`. Pass `callbacks=Some("before")` to deliver
204    /// callbacks at song start instead of song end.
205    ///
206    /// # Errors
207    ///
208    /// Returns [`AudDError`] for transport/server failures.
209    pub async fn add(
210        &self,
211        url: &str,
212        radio_id: i64,
213        callbacks: Option<&str>,
214        extra_parameters: Option<&HashMap<String, String>>,
215    ) -> Result<(), AudDError> {
216        // extra_parameters first; typed fields win on collision.
217        let mut fields: Vec<(&str, String)> = Vec::new();
218        if let Some(extras) = extra_parameters {
219            for (k, v) in extras {
220                fields.push((k.as_str(), v.clone()));
221            }
222        }
223        fields.push(("url", url.to_string()));
224        fields.push(("radio_id", radio_id.to_string()));
225        if let Some(cb) = callbacks {
226            fields.push(("callbacks", cb.to_string()));
227        }
228        post_form(
229            &self.inner.http,
230            &format!("{}/addStream/", self.inner.api_base),
231            &fields,
232            self.inner.mutating_policy(),
233        )
234        .await
235        .map(drop)
236    }
237
238    /// Update the URL of an existing stream subscription.
239    ///
240    /// # Errors
241    ///
242    /// Returns [`AudDError`] for transport/server failures.
243    pub async fn set_url(&self, radio_id: i64, url: &str) -> Result<(), AudDError> {
244        post_form(
245            &self.inner.http,
246            &format!("{}/setStreamUrl/", self.inner.api_base),
247            &[("radio_id", radio_id.to_string()), ("url", url.to_string())],
248            self.inner.mutating_policy(),
249        )
250        .await
251        .map(drop)
252    }
253
254    /// Delete a stream subscription.
255    ///
256    /// # Errors
257    ///
258    /// Returns [`AudDError`] for transport/server failures.
259    pub async fn delete(&self, radio_id: i64) -> Result<(), AudDError> {
260        post_form(
261            &self.inner.http,
262            &format!("{}/deleteStream/", self.inner.api_base),
263            &[("radio_id", radio_id.to_string())],
264            self.inner.mutating_policy(),
265        )
266        .await
267        .map(drop)
268    }
269
270    /// List all stream subscriptions on the caller's account.
271    ///
272    /// # Errors
273    ///
274    /// Returns [`AudDError`] for transport/server/parse failures.
275    pub async fn list(&self) -> Result<Vec<StreamRow>, AudDError> {
276        let result = post_form(
277            &self.inner.http,
278            &format!("{}/getStreams/", self.inner.api_base),
279            &[],
280            self.inner.read_policy(),
281        )
282        .await?;
283        if result.is_null() {
284            return Ok(Vec::new());
285        }
286        let v: Vec<StreamRow> =
287            serde_json::from_value(result.clone()).map_err(|e| AudDError::Serialization {
288                message: format!("could not parse getStreams result: {e}"),
289                raw_text: result.to_string(),
290            })?;
291        Ok(v)
292    }
293
294    /// Compute the 9-char longpoll category locally from `(api_token, radio_id)`.
295    /// Pure function — no network call. Snapshots the live api_token, so it
296    /// reflects any prior `AudD::set_api_token` rotation.
297    #[must_use]
298    pub fn derive_longpoll_category(&self, radio_id: i64) -> String {
299        derive_longpoll_category(&self.inner.api_token(), radio_id)
300    }
301
302    /// Parse an already-deserialized callback POST body into a typed
303    /// [`CallbackEvent`].
304    ///
305    /// # Errors
306    ///
307    /// Returns [`AudDError::Serialization`] if the body doesn't deserialize.
308    pub fn parse_callback(&self, body: Value) -> Result<CallbackEvent, AudDError> {
309        parse_callback(body)
310    }
311
312    /// Long-poll the AudD streams endpoint and return a [`LongpollPoll`]
313    /// handle whose typed streams (matches / notifications / errors) are
314    /// fed by a background tokio task.
315    ///
316    /// Server keepalive ticks (`{"timeout": "no events before timeout"}`) are
317    /// silently absorbed — they advance the internal cursor and never reach
318    /// the consumer.
319    ///
320    /// On entry, performs a one-time `getCallbackUrl` preflight unless
321    /// `opts.skip_callback_check == true`. If the server returns error #19
322    /// (no callback URL configured), [`AudDError::Api`] is returned with kind
323    /// [`ErrorKind::InvalidRequest`] explaining how to fix it.
324    ///
325    /// # Errors
326    ///
327    /// Returns [`AudDError`] from the preflight only. Fatal errors during
328    /// polling surface on [`LongpollPoll::errors`].
329    pub async fn longpoll(
330        &self,
331        category: &str,
332        opts: LongpollOptions,
333    ) -> Result<LongpollPoll, AudDError> {
334        if !opts.skip_callback_check {
335            self.preflight_callback().await?;
336        }
337        Ok(spawn_longpoll(LongpollDriver::Authenticated {
338            http: self.inner.http.clone(),
339            url: format!("{}/longpoll/", self.inner.api_base),
340            policy: self.inner.read_policy(),
341            category: category.to_string(),
342            opts,
343        }))
344    }
345
346    /// One-step longpoll for the common `(api_token, radio_id)` case: derives
347    /// the 9-char category internally and delegates to [`Self::longpoll`].
348    ///
349    /// Use [`Self::longpoll`] directly for the tokenless / share-the-category
350    /// flow (server derives, ships only the 9-char string to a browser /
351    /// mobile / embedded client).
352    ///
353    /// # Errors
354    ///
355    /// Same as [`Self::longpoll`] — propagates any preflight error.
356    pub async fn longpoll_by_radio_id(
357        &self,
358        radio_id: i64,
359        opts: LongpollOptions,
360    ) -> Result<LongpollPoll, AudDError> {
361        let category = self.derive_longpoll_category(radio_id);
362        self.longpoll(&category, opts).await
363    }
364
365    async fn preflight_callback(&self) -> Result<(), AudDError> {
366        match self.get_callback_url().await {
367            Ok(_) => Ok(()),
368            Err(e) if e.error_code() == Some(NO_CALLBACK_ERROR_CODE) => {
369                let (http_status, request_id) = match &e {
370                    AudDError::Api {
371                        http_status,
372                        request_id,
373                        ..
374                    } => (*http_status, request_id.clone()),
375                    _ => (0, None),
376                };
377                Err(AudDError::Api {
378                    code: 0,
379                    message: PREFLIGHT_NO_CALLBACK_HINT.to_string(),
380                    kind: ErrorKind::InvalidRequest,
381                    http_status,
382                    request_id,
383                    requested_params: std::collections::HashMap::new(),
384                    request_method: None,
385                    branded_message: None,
386                    raw_response: Value::Null,
387                })
388            }
389            Err(other) => Err(other),
390        }
391    }
392}
393
394/// Internal — describes a single longpoll fetch source so authenticated and
395/// tokenless consumers share the same dispatch loop.
396pub(crate) enum LongpollDriver {
397    Authenticated {
398        http: HttpClient,
399        url: String,
400        policy: RetryPolicy,
401        category: String,
402        opts: LongpollOptions,
403    },
404    Tokenless {
405        http: crate::http::BareHttpClient,
406        url: String,
407        policy: RetryPolicy,
408        category: String,
409        since_time: Option<i64>,
410        timeout: i64,
411    },
412}
413
414impl LongpollDriver {
415    fn category(&self) -> &str {
416        match self {
417            Self::Authenticated { category, .. } | Self::Tokenless { category, .. } => category,
418        }
419    }
420
421    fn timeout(&self) -> i64 {
422        match self {
423            Self::Authenticated { opts, .. } => opts.timeout,
424            Self::Tokenless { timeout, .. } => *timeout,
425        }
426    }
427
428    fn since_time(&self) -> Option<i64> {
429        match self {
430            Self::Authenticated { opts, .. } => opts.since_time,
431            Self::Tokenless { since_time, .. } => *since_time,
432        }
433    }
434
435    async fn fetch(
436        &self,
437        params: &[(&str, String)],
438    ) -> Result<crate::http::HttpResponse, AudDError> {
439        match self {
440            Self::Authenticated {
441                http, url, policy, ..
442            } => {
443                let url = url.clone();
444                let policy = *policy;
445                let http = http.clone();
446                let params: Vec<(&str, String)> =
447                    params.iter().map(|(k, v)| (*k, v.clone())).collect();
448                retry_async(
449                    || {
450                        let http = http.clone();
451                        let url = url.clone();
452                        let params = params.clone();
453                        async move { http.get(&url, &params, None).await }
454                    },
455                    policy,
456                )
457                .await
458            }
459            Self::Tokenless {
460                http, url, policy, ..
461            } => {
462                let url = url.clone();
463                let policy = *policy;
464                let http = http.clone();
465                let params: Vec<(&str, String)> =
466                    params.iter().map(|(k, v)| (*k, v.clone())).collect();
467                retry_async(
468                    || {
469                        let http = http.clone();
470                        let url = url.clone();
471                        let params = params.clone();
472                        async move { http.get(&url, &params).await }
473                    },
474                    policy,
475                )
476                .await
477            }
478        }
479    }
480}
481
482/// Spawn the background poll task and wire up the three streams.
483pub(crate) fn spawn_longpoll(driver: LongpollDriver) -> LongpollPoll {
484    let (match_tx, match_rx) = mpsc::channel::<StreamCallbackMatch>(CHANNEL_BUFFER);
485    let (notif_tx, notif_rx) = mpsc::channel::<StreamCallbackNotification>(CHANNEL_BUFFER);
486    let (err_tx, err_rx) = mpsc::channel::<AudDError>(1);
487    let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
488
489    let join = tokio::spawn(run_longpoll(
490        driver,
491        match_tx,
492        notif_tx,
493        err_tx,
494        shutdown_rx,
495    ));
496
497    LongpollPoll {
498        matches: Box::pin(channel_stream(match_rx)),
499        notifications: Box::pin(channel_stream(notif_rx)),
500        errors: Box::pin(channel_stream(err_rx)),
501        shutdown: Some(shutdown_tx),
502        join: Some(join),
503    }
504}
505
506fn channel_stream<T: Send + 'static>(mut rx: mpsc::Receiver<T>) -> impl Stream<Item = T> + Send {
507    async_stream::stream! {
508        while let Some(item) = rx.recv().await {
509            yield item;
510        }
511    }
512}
513
514/// Drive a single longpoll subscription: read responses, parse them, and
515/// dispatch to the typed channels. Exits when the shutdown signal is dropped,
516/// a fatal error fires, or all channels are closed.
517async fn run_longpoll(
518    driver: LongpollDriver,
519    match_tx: mpsc::Sender<StreamCallbackMatch>,
520    notif_tx: mpsc::Sender<StreamCallbackNotification>,
521    err_tx: mpsc::Sender<AudDError>,
522    mut shutdown_rx: mpsc::Receiver<()>,
523) {
524    let mut cur_since = driver.since_time();
525    let timeout_secs = driver.timeout().to_string();
526    let category = driver.category().to_string();
527
528    loop {
529        // Build params each iteration so since_time updates flow through.
530        let mut params: Vec<(&str, String)> = vec![
531            ("category", category.clone()),
532            ("timeout", timeout_secs.clone()),
533        ];
534        if let Some(t) = cur_since {
535            params.push(("since_time", t.to_string()));
536        }
537
538        // Race the fetch against shutdown so a quiescent caller can hang up
539        // even mid-poll.
540        let resp = tokio::select! {
541            biased;
542            _ = shutdown_rx.recv() => return,
543            r = driver.fetch(&params) => r,
544        };
545
546        let resp = match resp {
547            Ok(r) => r,
548            Err(e) => {
549                let _ = err_tx.send(e).await;
550                return;
551            }
552        };
553
554        // Surface non-2xx as a Server error.
555        if resp.http_status >= HTTP_CLIENT_ERROR_FLOOR {
556            let _ = err_tx
557                .send(AudDError::Server {
558                    http_status: resp.http_status,
559                    message: format!("Longpoll endpoint returned HTTP {}", resp.http_status),
560                    request_id: resp.request_id,
561                    raw_response: resp.raw_text,
562                })
563                .await;
564            return;
565        }
566
567        let Some(body) = resp.json_body else {
568            let _ = err_tx
569                .send(AudDError::Serialization {
570                    message: "Longpoll response was not a JSON object".into(),
571                    raw_text: resp.raw_text,
572                })
573                .await;
574            return;
575        };
576
577        // Silently absorb keepalive ticks.
578        if is_longpoll_keepalive(&body) {
579            if let Some(ts) = body.get("timestamp").and_then(Value::as_i64) {
580                cur_since = Some(ts);
581            }
582            continue;
583        }
584
585        // Advance the cursor before parsing — even if parsing fails, we don't
586        // want to re-poll the same window.
587        if let Some(ts) = body.get("timestamp").and_then(Value::as_i64) {
588            cur_since = Some(ts);
589        }
590
591        match parse_callback(body) {
592            Ok(CallbackEvent::Match(m)) => {
593                tokio::select! {
594                    biased;
595                    _ = shutdown_rx.recv() => return,
596                    res = match_tx.send(m) => {
597                        if res.is_err() { return; }
598                    }
599                }
600            }
601            Ok(CallbackEvent::Notification(n)) => {
602                tokio::select! {
603                    biased;
604                    _ = shutdown_rx.recv() => return,
605                    res = notif_tx.send(n) => {
606                        if res.is_err() { return; }
607                    }
608                }
609            }
610            Err(e) => {
611                let _ = err_tx.send(e).await;
612                return;
613            }
614        }
615    }
616}
617
618/// Reports whether `body` is a `{"timeout": "no events before timeout"}`
619/// keepalive tick — the server emits one of these every `<timeout>` seconds
620/// when no recognition or notification is queued. Mirrors audd-go's
621/// `isLongpollKeepalive` helper.
622pub(crate) fn is_longpoll_keepalive(body: &Value) -> bool {
623    let Some(obj) = body.as_object() else {
624        return false;
625    };
626    if obj.contains_key("result") || obj.contains_key("notification") {
627        return false;
628    }
629    obj.contains_key("timeout")
630}
631
632/// Internal — POST a form body to a streams-namespace endpoint and return the
633/// `result` field on success.
634async fn post_form(
635    http: &HttpClient,
636    url: &str,
637    fields: &[(&str, String)],
638    policy: RetryPolicy,
639) -> Result<Value, AudDError> {
640    let url = url.to_string();
641    let fields: Vec<(&str, String)> = fields.iter().map(|(k, v)| (*k, v.clone())).collect();
642    let resp = retry_async(
643        || {
644            let http = http.clone();
645            let url = url.clone();
646            let fields = fields.clone();
647            async move { http.post_form(&url, &fields, None, None).await }
648        },
649        policy,
650    )
651    .await?;
652    let body = decode_or_raise(resp, false)?;
653    Ok(body.get("result").cloned().unwrap_or(Value::Null))
654}
655
656#[cfg(test)]
657mod tests {
658    use super::*;
659
660    #[test]
661    fn longpoll_options_default() {
662        let o = LongpollOptions::default();
663        assert_eq!(o.timeout, 50);
664        assert!(!o.skip_callback_check);
665    }
666
667    #[test]
668    fn longpoll_options_chain() {
669        let o = LongpollOptions::default()
670            .timeout(30)
671            .since_time(123)
672            .skip_callback_check(true);
673        assert_eq!(o.timeout, 30);
674        assert_eq!(o.since_time, Some(123));
675        assert!(o.skip_callback_check);
676    }
677
678    #[test]
679    fn keepalive_detection() {
680        let kp = serde_json::json!({"timeout": "no events before timeout", "timestamp": 1});
681        assert!(is_longpoll_keepalive(&kp));
682
683        let with_result = serde_json::json!({
684            "result": {"radio_id": 1, "results": []},
685            "timeout": "no events"
686        });
687        assert!(!is_longpoll_keepalive(&with_result));
688
689        let with_notif = serde_json::json!({
690            "notification": {"radio_id": 1},
691            "timeout": "x"
692        });
693        assert!(!is_longpoll_keepalive(&with_notif));
694
695        let no_timeout = serde_json::json!({"timestamp": 1});
696        assert!(!is_longpoll_keepalive(&no_timeout));
697
698        let not_object = serde_json::json!([1, 2, 3]);
699        assert!(!is_longpoll_keepalive(&not_object));
700    }
701}