use atomic_refcell::AtomicRefCell;
use std::sync::{Arc, LazyLock, Mutex};
use url::Url;
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst_base::subclass::prelude::*;
use crate::icecastsink::client;
use crate::icecastsink::mediaformat::*;
const DEFAULT_LOCATION: Option<Url> = None;
const DEFAULT_TIMEOUT: u32 = 10_000; const DEFAULT_PUBLIC: bool = true;
const DEFAULT_STREAM_NAME: Option<String> = None;
const DEFAULT_AUTO_RECONNECT: bool = true;
#[derive(Debug, Clone)]
struct Settings {
location: Option<Url>,
timeout: u32, public: bool,
stream_name: Option<String>,
auto_reconnect: bool,
}
impl Default for Settings {
fn default() -> Self {
Settings {
location: DEFAULT_LOCATION,
timeout: DEFAULT_TIMEOUT,
public: DEFAULT_PUBLIC,
stream_name: DEFAULT_STREAM_NAME,
auto_reconnect: DEFAULT_AUTO_RECONNECT,
}
}
}
#[derive(Debug, Default, PartialEq)]
enum State {
#[default]
Stopped,
Connecting,
Streaming,
Error,
Cancelled,
}
#[derive(Debug, Default)]
pub struct IcecastSink {
settings: Mutex<Settings>,
state: AtomicRefCell<State>,
format: AtomicRefCell<Option<MediaFormat>>, pending_streamheaders: AtomicRefCell<Vec<gst::Buffer>>,
client: Mutex<Option<Arc<client::IceClient>>>,
}
pub(crate) static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
gst::DebugCategory::new(
"icecastsink",
gst::DebugColorFlags::empty(),
Some("Icecast sink"),
)
});
impl IcecastSink {
fn set_location(&self, uri: Option<&str>) -> Result<(), glib::Error> {
if self.obj().current_state() != gst::State::Null {
return Err(glib::Error::new(
gst::URIError::BadState,
"Changing the `location` property on a started `icecastsink` is not supported",
));
}
let mut settings = self.settings.lock().unwrap();
let Some(uri) = uri else {
settings.location = DEFAULT_LOCATION;
return Ok(());
};
let uri = Url::parse(uri).map_err(|err| {
glib::Error::new(
gst::URIError::BadUri,
&format!("Failed to parse URI '{uri}': {err:?}"),
)
})?;
if uri.scheme() != "ice+http" && uri.scheme() != "ice+https" {
return Err(glib::Error::new(
gst::URIError::UnsupportedProtocol,
&format!("Unsupported URI scheme '{}'", uri.scheme()),
));
}
settings.location = Some(uri);
Ok(())
}
}
impl ObjectImpl for IcecastSink {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: LazyLock<Vec<glib::ParamSpec>> = LazyLock::new(|| {
vec![
glib::ParamSpecString::builder("location")
.nick("Location")
.blurb("Icecast server, credentials and mount path, e.g. ice+http://source:p4ssw0rd@ingest.smoothjazz.radio:8000/radio")
.mutable_ready()
.build(),
glib::ParamSpecUInt::builder("timeout")
.nick("Timeout")
.blurb("Timeout for network activity, in milliseconds")
.maximum(60_000)
.default_value(DEFAULT_TIMEOUT)
.mutable_ready()
.build(),
glib::ParamSpecBoolean::builder("public")
.nick("Public")
.blurb("Whether the stream should be listed on the server's stream directory")
.default_value(DEFAULT_PUBLIC)
.mutable_ready()
.build(),
glib::ParamSpecString::builder("stream-name")
.nick("Stream Name")
.blurb("Name of the stream (if not configured server-side for the mount point)")
.mutable_ready()
.build(),
glib::ParamSpecBoolean::builder("auto-reconnect")
.nick("Auto Reconnect")
.blurb("Automatically re-connect if the connection with the server breaks")
.default_value(DEFAULT_AUTO_RECONNECT)
.mutable_ready()
.build(),
]
});
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let res = match pspec.name() {
"location" => {
let location = value.get::<Option<&str>>().expect("type checked upstream");
self.set_location(location)
}
"timeout" => {
let mut settings = self.settings.lock().unwrap();
let timeout = value.get().expect("type checked upstream");
settings.timeout = timeout;
Ok(())
}
"public" => {
let mut settings = self.settings.lock().unwrap();
let public = value.get().expect("type checked upstream");
settings.public = public;
Ok(())
}
"stream-name" => {
let mut settings = self.settings.lock().unwrap();
settings.stream_name = value
.get::<Option<String>>()
.expect("type checked upstream");
Ok(())
}
"auto-reconnect" => {
let mut settings = self.settings.lock().unwrap();
let auto_reconnect = value.get().expect("type checked upstream");
settings.auto_reconnect = auto_reconnect;
Ok(())
}
name => unimplemented!("Property '{name}'"),
};
if let Err(err) = res {
gst::error!(
CAT,
imp = self,
"Failed to set property `{}`: {:?}",
pspec.name(),
err
);
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"location" => {
let settings = self.settings.lock().unwrap();
let location = settings.location.as_ref().map(Url::to_string);
location.to_value()
}
"timeout" => {
let settings = self.settings.lock().unwrap();
settings.timeout.to_value()
}
"public" => {
let settings = self.settings.lock().unwrap();
settings.public.to_value()
}
"stream-name" => {
let settings = self.settings.lock().unwrap();
settings.stream_name.to_value()
}
"auto-reconnect" => {
let settings = self.settings.lock().unwrap();
settings.auto_reconnect.to_value()
}
name => unimplemented!("Property '{name}'"),
}
}
}
impl GstObjectImpl for IcecastSink {}
impl ElementImpl for IcecastSink {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: LazyLock<gst::subclass::ElementMetadata> = LazyLock::new(|| {
gst::subclass::ElementMetadata::new(
"Icecast Sink",
"Sink/Network",
"Sends an audio stream to an Icecast server",
"Tim-Philipp Müller <tim centricular com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: LazyLock<Vec<gst::PadTemplate>> = LazyLock::new(|| {
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&[
gst::Structure::builder("audio/mpeg")
.field("mpegversion", 1)
.field("layer", gst::IntRange::new(1, 3))
.field("channels", gst::IntRange::new(1, 2))
.field(
"rate",
gst::List::new([
8000i32, 11025, 12000, 16000, 22050, 24000, 32000, 44100, 48000,
]),
)
.field("parsed", true)
.build(),
gst::Structure::builder("audio/mpeg")
.field("mpegversion", gst::List::new([2i32, 4]))
.field(
"rate",
gst::List::new([48000i32, 96000, 44100, 22050, 11025]),
)
.field("stream-format", "adts")
.field("framed", true)
.build(),
gst::Structure::builder("audio/x-flac")
.field("channels", gst::IntRange::new(1, 2))
.field(
"rate",
gst::List::new([48000i32, 96000, 44100, 22050, 11025]),
)
.field("framed", true)
.build(),
gst::Structure::builder("audio/ogg").build(),
]
.into_iter()
.collect::<gst::Caps>(),
)
.unwrap();
vec![sink_pad_template]
});
PAD_TEMPLATES.as_ref()
}
}
impl BaseSinkImpl for IcecastSink {
fn start(&self) -> Result<(), gst::ErrorMessage> {
let settings = self.settings.lock().unwrap();
let Some(url) = settings.location.as_ref() else {
return Err(gst::error_msg!(
gst::ResourceError::Settings,
["No location set"]
));
};
let Some(host_str) = url.host_str() else {
return Err(gst::error_msg!(
gst::ResourceError::Settings,
["No hostname or IP set"]
));
};
gst::info!(CAT, imp = self, "Location: {url}",);
self.post_message(
gst::message::Progress::builder(
gst::ProgressType::Start,
"connect",
&format!("Connecting to {host_str}"),
)
.src(&*self.obj())
.build(),
);
let client = client::IceClient::new(
url.clone(),
settings.public,
settings.stream_name.clone(),
self.obj().name(), )?;
let mut client_guard = self.client.lock().unwrap();
*client_guard = Some(Arc::new(client));
let mut state = self.state.borrow_mut();
*state = State::Connecting;
gst::info!(CAT, imp = self, "Started");
Ok(())
}
fn stop(&self) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.borrow_mut();
*state = State::Stopped;
let mut client_guard = self.client.lock().unwrap();
*client_guard = None;
let mut format = self.format.borrow_mut();
*format = None;
gst::info!(CAT, imp = self, "Stopped");
Ok(())
}
fn unlock(&self) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, imp = self, "Unlocking");
let client = self.client();
client.cancel();
Ok(())
}
fn unlock_stop(&self) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, imp = self, "Unlock done");
let client = self.client();
client.clear_cancel();
Ok(())
}
fn set_caps(&self, caps: &gst::Caps) -> Result<(), gst::LoggableError> {
gst::info!(CAT, imp = self, "Got caps {caps}");
let media_format = MediaFormat::from_caps(caps)?;
gst::info!(CAT, imp = self, "{media_format:?}");
let mut format = self.format.borrow_mut();
*format = Some(media_format.clone());
let client = self.client();
client.set_media_format(media_format);
Ok(())
}
fn prepare(&self, _buffer: &gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.borrow_mut();
let client = self.client();
match *state {
State::Streaming => return Ok(gst::FlowSuccess::Ok),
State::Error => return Err(gst::FlowError::Error),
State::Cancelled => return Err(gst::FlowError::Flushing),
_ => {}
}
let timeout = { self.settings.lock().unwrap().timeout };
let res = client.wait_for_connection_and_handshake(timeout);
if let Err(err) = res {
if let Some(err_msg) = err {
gst::info!(CAT, imp = self, "Error {err_msg:?}");
*state = State::Error;
self.post_message(
gst::message::Progress::builder(
gst::ProgressType::Error,
"connect",
"Could not connect to host",
)
.src(&*self.obj())
.build(),
);
self.post_error_message(err_msg);
return Err(gst::FlowError::Error);
} else {
gst::debug!(CAT, imp = self, "Cancelled, flushing");
*state = State::Cancelled;
self.post_message(
gst::message::Progress::builder(
gst::ProgressType::Canceled,
"connect",
"Connect cancelled",
)
.src(&*self.obj())
.build(),
);
return Err(gst::FlowError::Flushing);
}
}
self.post_message(
gst::message::Progress::builder(
gst::ProgressType::Complete,
"connect",
"Connected to host",
)
.src(&*self.obj())
.build(),
);
let mut pending_streamheaders = self.pending_streamheaders.borrow_mut();
let stream_headers = std::mem::take(&mut *pending_streamheaders);
for header_buf in stream_headers {
let map = header_buf.map_readable().map_err(|_| {
gst::error_msg!(gst::CoreError::Failed, ["Failed to map buffer"]);
gst::FlowError::Error
})?;
let write_data = map.as_slice();
gst::info!(CAT, imp = self, "Re-sending stream header {header_buf:?}..");
match client.send_data(write_data, timeout) {
Ok(_) => {}
Err(None) => {
gst::debug!(CAT, imp = self, "Cancelled, flushing");
*state = State::Cancelled;
return Err(gst::FlowError::Flushing);
}
Err(Some(err_msg)) => {
gst::info!(CAT, imp = self, "Error {err_msg:?}");
*state = State::Error;
self.post_error_message(err_msg);
return Err(gst::FlowError::Error);
}
}
}
*state = State::Streaming;
gst::info!(CAT, imp = self, "Ready to stream");
Ok(gst::FlowSuccess::Ok)
}
fn render(&self, buffer: &gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.borrow_mut();
match *state {
State::Error => return Err(gst::FlowError::Error),
State::Cancelled => return Err(gst::FlowError::Flushing),
_ => {}
}
debug_assert_eq!(*state, State::Streaming);
let map = buffer.map_readable().map_err(|_| {
gst::error_msg!(gst::CoreError::Failed, ["Failed to map buffer"]);
gst::FlowError::Error
})?;
let write_data = map.as_slice();
gst::log!(CAT, imp = self, "Sending {buffer:?}..");
let client = self.client();
let (timeout, auto_reconnect) = {
let settings = self.settings.lock().unwrap();
(settings.timeout, settings.auto_reconnect)
};
let res = client.send_data(write_data, timeout);
match res {
Ok(_) => return Ok(gst::FlowSuccess::Ok),
Err(None) => {
gst::debug!(CAT, imp = self, "Cancelled, flushing");
*state = State::Cancelled;
return Err(gst::FlowError::Flushing);
}
Err(Some(err_msg)) => {
gst::info!(CAT, imp = self, "Error {err_msg:?}");
if !auto_reconnect {
*state = State::Error;
self.post_error_message(err_msg);
return Err(gst::FlowError::Error);
}
gst::element_imp_warning!(self, gst::ResourceError::Write, ["{}", err_msg]);
}
}
gst::info!(CAT, imp = self, "Re-connecting after error ...");
drop(client);
drop(state);
self.start().map_err(|_| gst::FlowError::Error)?;
gst::info!(CAT, imp = self, "Re-started.");
let client = self.client();
let format = self.format.borrow();
let media_format = format.as_ref().unwrap().clone();
let stream_headers = media_format.stream_headers();
client.set_media_format(media_format);
let mut pending_streamheaders = self.pending_streamheaders.borrow_mut();
*pending_streamheaders = stream_headers;
Ok(gst::FlowSuccess::Ok)
}
}
impl URIHandlerImpl for IcecastSink {
const URI_TYPE: gst::URIType = gst::URIType::Sink;
fn protocols() -> &'static [&'static str] {
&["ice+http", "ice+https"]
}
fn uri(&self) -> Option<String> {
let settings = self.settings.lock().unwrap();
settings.location.as_ref().map(Url::to_string)
}
fn set_uri(&self, uri: &str) -> Result<(), glib::Error> {
self.set_location(Some(uri))
}
}
impl IcecastSink {
fn client(&self) -> Arc<client::IceClient> {
let client_guard = self.client.lock().unwrap();
client_guard.as_ref().unwrap().clone()
}
}
#[glib::object_subclass]
impl ObjectSubclass for IcecastSink {
const NAME: &'static str = "GstIcecastSink";
type Type = super::IcecastSink;
type ParentType = gst_base::BaseSink;
type Interfaces = (gst::URIHandler,);
}