#![allow(clippy::result_large_err)]
use std::path::PathBuf;
use audd::streams::LongpollOptions;
use audd::{AudD, StreamCallbackMatch};
use chrono::Utc;
use clap::Parser;
use csv::WriterBuilder;
use futures_util::StreamExt;
use tokio::signal;
const DEFAULT_RADIO_ID: i64 = 99_999;
const DEFAULT_OUTPUT: &str = "audd_stream_tracks.csv";
const DEFAULT_CALLBACK_URL: &str = "https://audd.tech/empty/";
const NO_CALLBACK_ERROR_CODE: i32 = 19;
#[derive(Parser, Debug)]
#[command(
name = "stream_to_csv",
about = "Long-poll an AudD stream and append every recognition to a CSV.",
long_about = None,
)]
struct Args {
#[arg(long)]
url: Option<String>,
#[arg(long)]
radio_id: Option<i64>,
#[arg(long, default_value = DEFAULT_OUTPUT)]
output: PathBuf,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum Mode {
ProvisionAndListen,
ListenOnly,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
let (mode, radio_id) = match (&args.url, args.radio_id) {
(Some(_), Some(id)) => (Mode::ProvisionAndListen, id),
(Some(_), None) => (Mode::ProvisionAndListen, DEFAULT_RADIO_ID),
(None, Some(id)) => (Mode::ListenOnly, id),
(None, None) => {
return Err(
"pass --url to provision a stream, or --radio-id to listen on an existing one"
.into(),
);
}
};
let audd = AudD::new(std::env::var("AUDD_API_TOKEN").unwrap_or_default());
let mut we_set_default_callback = false;
match mode {
Mode::ProvisionAndListen => {
we_set_default_callback = ensure_callback_for_provisioning(&audd).await?;
let url = args.url.as_deref().expect("ProvisionAndListen has --url");
audd.streams().add(url, radio_id, None).await?;
eprintln!("added stream radio_id={radio_id} url={url}");
}
Mode::ListenOnly => {
ensure_callback_for_listen_only(&audd).await?;
eprintln!("listening on existing stream radio_id={radio_id}");
}
}
let file_is_fresh = !args.output.exists()
|| std::fs::metadata(&args.output)
.map(|m| m.len() == 0)
.unwrap_or(true);
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&args.output)?;
let mut writer = WriterBuilder::new().has_headers(false).from_writer(file);
if file_is_fresh {
writer.write_record([
"received_at",
"radio_id",
"timestamp",
"score",
"artist",
"title",
"album",
"song_link",
])?;
writer.flush()?;
}
let category = audd.streams().derive_longpoll_category(radio_id);
eprintln!(
"writing matches to {} (category={category}, Ctrl-C to stop)",
args.output.display()
);
let mut poll = audd
.streams()
.longpoll(
&category,
LongpollOptions::default()
.timeout(50)
.skip_callback_check(true),
)
.await?;
let shutdown = signal::ctrl_c();
tokio::pin!(shutdown);
loop {
tokio::select! {
biased;
_ = &mut shutdown => {
eprintln!("\nshutdown requested");
break;
}
Some(err) = poll.errors.next() => {
eprintln!("longpoll error: {err}");
break;
}
Some(notif) = poll.notifications.next() => {
eprintln!(
"[notification] radio_id={} code={} {}",
notif.radio_id, notif.notification_code, notif.notification_message,
);
}
Some(m) = poll.matches.next() => {
handle_match(&m, &mut writer)?;
}
else => break,
}
}
poll.close().await;
if mode == Mode::ProvisionAndListen {
match audd.streams().delete(radio_id).await {
Ok(()) => eprintln!("deleted stream radio_id={radio_id}"),
Err(e) => eprintln!("could not delete stream radio_id={radio_id}: {e}"),
}
}
if we_set_default_callback {
eprintln!(
"left {DEFAULT_CALLBACK_URL} as your account callback — change it via \
streams().set_callback_url(...) if needed."
);
}
Ok(())
}
async fn ensure_callback_for_provisioning(audd: &AudD) -> Result<bool, Box<dyn std::error::Error>> {
match audd.streams().get_callback_url().await {
Ok(_) => Ok(false),
Err(e) if e.error_code() == Some(NO_CALLBACK_ERROR_CODE) => {
eprintln!(
"longpoll requires any 200-OK URL server-side; using {DEFAULT_CALLBACK_URL} as a default."
);
audd.streams()
.set_callback_url(DEFAULT_CALLBACK_URL, None)
.await?;
Ok(true)
}
Err(e) => Err(e.into()),
}
}
async fn ensure_callback_for_listen_only(audd: &AudD) -> Result<(), Box<dyn std::error::Error>> {
match audd.streams().get_callback_url().await {
Ok(_) => Ok(()),
Err(e) if e.error_code() == Some(NO_CALLBACK_ERROR_CODE) => Err(
"stream slot exists but no callback URL is configured for this account; \
longpoll won't deliver. Set one first via streams().set_callback_url(...).\n\
https://audd.tech/empty/ is fine if you only want longpolling."
.into(),
),
Err(e) => Err(e.into()),
}
}
fn handle_match(
m: &StreamCallbackMatch,
writer: &mut csv::Writer<std::fs::File>,
) -> Result<(), Box<dyn std::error::Error>> {
let received_at = Utc::now().to_rfc3339();
for entry in std::iter::once(&m.song).chain(m.alternatives.iter()) {
writer.write_record([
received_at.as_str(),
&m.radio_id.to_string(),
m.timestamp.as_deref().unwrap_or(""),
&entry.score.to_string(),
entry.artist.as_str(),
entry.title.as_str(),
entry.album.as_deref().unwrap_or(""),
entry.song_link.as_deref().unwrap_or(""),
])?;
}
writer.flush()?;
println!(
"[match] radio_id={} timestamp={} {} — {}",
m.radio_id,
m.timestamp.as_deref().unwrap_or(""),
m.song.artist,
m.song.title,
);
Ok(())
}