use crate::{Actor, ActorBehavior, Message, Port};
use anyhow::{Error, Result};
use reflow_actor::{message::EncodableValue, stream::StreamFrame, ActorContext};
use reflow_actor_macro::actor;
use serde_json::json;
use std::collections::HashMap;
const DEFAULT_AUDIO_CONTENT_TYPES: &[&str] = &[
"audio/mpeg",
"audio/wav",
"audio/ogg",
"audio/webm",
"audio/aac",
"audio/flac",
"audio/raw-pcm",
];
#[actor(
AudioStreamDisplayActor,
inports::<100>(stream),
outports::<50>(metadata, error),
state(MemoryState)
)]
pub async fn audio_stream_display_actor(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let config = context.get_config_hashmap();
let autoplay = config
.get("autoplay")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let loop_playback = config
.get("loop")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let show_waveform = config
.get("showWaveform")
.and_then(|v| v.as_bool())
.unwrap_or(true);
let stream_rx = context
.take_stream_receiver("stream")
.ok_or_else(|| anyhow::anyhow!("No stream handle on 'stream' port"))?;
let first = stream_rx
.recv_async()
.await
.map_err(|_| anyhow::anyhow!("Stream closed before Begin frame"))?;
let (content_type, size_hint, stream_meta) = match first {
StreamFrame::Begin {
content_type,
size_hint,
metadata,
} => (content_type, size_hint, metadata),
StreamFrame::Error(e) => return Ok(error_output(format!("Stream error: {}", e))),
_ => return Ok(error_output("Expected Begin frame, got data".into())),
};
let ct = content_type
.as_deref()
.unwrap_or("application/octet-stream");
let accepted: Vec<String> = config
.get("acceptedFormats")
.and_then(|v| v.as_str())
.map(|s| s.split(',').map(|f| f.trim().to_string()).collect())
.unwrap_or_else(|| {
DEFAULT_AUDIO_CONTENT_TYPES
.iter()
.map(|s| s.to_string())
.collect()
});
if !accepted.iter().any(|f| ct.starts_with(f.as_str())) {
return Ok(error_output(format!(
"Unsupported audio stream format: {}",
ct
)));
}
let sample_rate = stream_meta
.as_ref()
.and_then(|m| m.get("sampleRate"))
.and_then(|v| v.as_u64());
let channels = stream_meta
.as_ref()
.and_then(|m| m.get("channels"))
.and_then(|v| v.as_u64());
let bit_depth = stream_meta
.as_ref()
.and_then(|m| m.get("bitDepth"))
.and_then(|v| v.as_u64());
let duration_ms = stream_meta
.as_ref()
.and_then(|m| m.get("durationMs"))
.and_then(|v| v.as_u64());
let mut total_bytes: u64 = 0;
let mut chunk_count: u64 = 0;
loop {
match stream_rx.recv_async().await {
Ok(StreamFrame::Data(data)) => {
total_bytes += data.len() as u64;
chunk_count += 1;
}
Ok(StreamFrame::End) => break,
Ok(StreamFrame::Error(e)) => {
return Ok(error_output(format!("Stream error during transfer: {}", e)));
}
Ok(StreamFrame::Begin { .. }) => {}
Err(_) => break,
}
}
let metadata = json!({
"contentType": ct,
"sizeHint": size_hint,
"totalBytes": total_bytes,
"chunks": chunk_count,
"sampleRate": sample_rate,
"channels": channels,
"bitDepth": bit_depth,
"durationMs": duration_ms,
"playback": {
"autoplay": autoplay,
"loop": loop_playback,
"showWaveform": show_waveform,
},
"streamMetadata": stream_meta,
});
let mut output = HashMap::new();
output.insert(
"metadata".to_string(),
Message::object(EncodableValue::from(metadata)),
);
Ok(output)
}
fn error_output(msg: String) -> HashMap<String, Message> {
let mut out = HashMap::new();
out.insert("error".to_string(), Message::Error(msg.into()));
out
}