use std::pin::Pin;
use futures_core::Stream;
use serde_json::Value;
use crate::client::{decode_or_raise, AudDInner};
use crate::errors::{AudDError, ErrorKind};
use crate::helpers::{add_return_to_url, derive_longpoll_category, parse_callback};
use crate::http::HttpClient;
use crate::models::{Stream as StreamRow, StreamCallbackPayload};
use crate::retry::{retry_async, RetryPolicy};
const NO_CALLBACK_ERROR_CODE: i32 = 19;
const PREFLIGHT_NO_CALLBACK_HINT: &str =
"Longpoll won't deliver events because no callback URL is configured for this account. \
Set one first via streams.set_callback_url(...) — `https://audd.tech/empty/` is fine if \
you only want longpolling and don't need a real receiver. \
To skip this check, pass skip_callback_check=true.";
#[derive(Debug, Clone)]
pub struct LongpollOptions {
since_time: Option<i64>,
timeout: i64,
skip_callback_check: bool,
}
impl Default for LongpollOptions {
fn default() -> Self {
Self {
since_time: None,
timeout: 50,
skip_callback_check: false,
}
}
}
impl LongpollOptions {
#[must_use]
pub fn since_time(mut self, t: i64) -> Self {
self.since_time = Some(t);
self
}
#[must_use]
pub fn timeout(mut self, secs: i64) -> Self {
self.timeout = secs;
self
}
#[must_use]
pub fn skip_callback_check(mut self, skip: bool) -> Self {
self.skip_callback_check = skip;
self
}
}
pub struct Streams<'a> {
inner: &'a AudDInner,
}
impl<'a> Streams<'a> {
pub(crate) fn new(inner: &'a AudDInner) -> Self {
Self { inner }
}
pub async fn set_callback_url(
&self,
url: &str,
return_metadata: Option<&[String]>,
) -> Result<(), AudDError> {
let url = add_return_to_url(url, return_metadata)?;
post_form(
&self.inner.http,
&format!("{}/setCallbackUrl/", self.inner.api_base),
&[("url", url)],
self.inner.mutating_policy(),
)
.await
.map(drop)
}
pub async fn get_callback_url(&self) -> Result<String, AudDError> {
let result = post_form(
&self.inner.http,
&format!("{}/getCallbackUrl/", self.inner.api_base),
&[],
self.inner.read_policy(),
)
.await?;
Ok(result
.as_str()
.map_or_else(|| result.to_string(), str::to_string))
}
pub async fn add(
&self,
url: &str,
radio_id: i64,
callbacks: Option<&str>,
) -> Result<(), AudDError> {
let mut fields: Vec<(&str, String)> =
vec![("url", url.to_string()), ("radio_id", radio_id.to_string())];
if let Some(cb) = callbacks {
fields.push(("callbacks", cb.to_string()));
}
post_form(
&self.inner.http,
&format!("{}/addStream/", self.inner.api_base),
&fields,
self.inner.mutating_policy(),
)
.await
.map(drop)
}
pub async fn set_url(&self, radio_id: i64, url: &str) -> Result<(), AudDError> {
post_form(
&self.inner.http,
&format!("{}/setStreamUrl/", self.inner.api_base),
&[("radio_id", radio_id.to_string()), ("url", url.to_string())],
self.inner.mutating_policy(),
)
.await
.map(drop)
}
pub async fn delete(&self, radio_id: i64) -> Result<(), AudDError> {
post_form(
&self.inner.http,
&format!("{}/deleteStream/", self.inner.api_base),
&[("radio_id", radio_id.to_string())],
self.inner.mutating_policy(),
)
.await
.map(drop)
}
pub async fn list(&self) -> Result<Vec<StreamRow>, AudDError> {
let result = post_form(
&self.inner.http,
&format!("{}/getStreams/", self.inner.api_base),
&[],
self.inner.read_policy(),
)
.await?;
if result.is_null() {
return Ok(Vec::new());
}
let v: Vec<StreamRow> =
serde_json::from_value(result.clone()).map_err(|e| AudDError::Serialization {
message: format!("could not parse getStreams result: {e}"),
raw_text: result.to_string(),
})?;
Ok(v)
}
#[must_use]
pub fn derive_longpoll_category(&self, radio_id: i64) -> String {
derive_longpoll_category(&self.inner.api_token(), radio_id)
}
pub fn parse_callback(&self, body: Value) -> Result<StreamCallbackPayload, AudDError> {
parse_callback(body)
}
pub async fn longpoll(
&self,
category: &str,
opts: LongpollOptions,
) -> Result<Pin<Box<dyn Stream<Item = Result<Value, AudDError>> + Send>>, AudDError> {
if !opts.skip_callback_check {
self.preflight_callback().await?;
}
Ok(longpoll_stream(
self.inner.http.clone(),
format!("{}/longpoll/", self.inner.api_base),
category.to_string(),
opts,
self.inner.read_policy(),
))
}
async fn preflight_callback(&self) -> Result<(), AudDError> {
match self.get_callback_url().await {
Ok(_) => Ok(()),
Err(e) if e.error_code() == Some(NO_CALLBACK_ERROR_CODE) => {
let (http_status, request_id) = match &e {
AudDError::Api {
http_status,
request_id,
..
} => (*http_status, request_id.clone()),
_ => (0, None),
};
Err(AudDError::Api {
code: 0,
message: PREFLIGHT_NO_CALLBACK_HINT.to_string(),
kind: ErrorKind::InvalidRequest,
http_status,
request_id,
requested_params: std::collections::HashMap::new(),
request_method: None,
branded_message: None,
raw_response: Value::Null,
})
}
Err(other) => Err(other),
}
}
}
fn longpoll_stream(
http: HttpClient,
url: String,
category: String,
opts: LongpollOptions,
policy: RetryPolicy,
) -> Pin<Box<dyn Stream<Item = Result<Value, AudDError>> + Send>> {
let mut cur_since = opts.since_time;
let timeout = opts.timeout;
let stream = async_stream::try_stream! {
loop {
let mut params: Vec<(&str, String)> = vec![
("category", category.clone()),
("timeout", timeout.to_string()),
];
if let Some(t) = cur_since {
params.push(("since_time", t.to_string()));
}
let resp = retry_async(
|| {
let http = http.clone();
let url = url.clone();
let params = params.clone();
async move {
http.get(&url, ¶ms, None).await
}
},
policy,
)
.await?;
let body = resp.json_body.ok_or_else(|| AudDError::Serialization {
message: "longpoll response was not JSON".into(),
raw_text: resp.raw_text.clone(),
})?;
let body_obj = body.as_object().ok_or_else(|| AudDError::Serialization {
message: "longpoll response was not a JSON object".into(),
raw_text: resp.raw_text.clone(),
})?;
if let Some(ts) = body_obj.get("timestamp").and_then(Value::as_i64) {
cur_since = Some(ts);
}
yield body;
}
};
Box::pin(stream)
}
async fn post_form(
http: &HttpClient,
url: &str,
fields: &[(&str, String)],
policy: RetryPolicy,
) -> Result<Value, AudDError> {
let url = url.to_string();
let fields: Vec<(&str, String)> = fields.iter().map(|(k, v)| (*k, v.clone())).collect();
let resp = retry_async(
|| {
let http = http.clone();
let url = url.clone();
let fields = fields.clone();
async move { http.post_form(&url, &fields, None, None).await }
},
policy,
)
.await?;
let body = decode_or_raise(resp, false)?;
Ok(body.get("result").cloned().unwrap_or(Value::Null))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn longpoll_options_default() {
let o = LongpollOptions::default();
assert_eq!(o.timeout, 50);
assert!(!o.skip_callback_check);
}
#[test]
fn longpoll_options_chain() {
let o = LongpollOptions::default()
.timeout(30)
.since_time(123)
.skip_callback_check(true);
assert_eq!(o.timeout, 30);
assert_eq!(o.since_time, Some(123));
assert!(o.skip_callback_check);
}
}