use crate::{Actor, ActorBehavior, Message, Port};
use anyhow::{Error, Result};
use reflow_actor::{
message::EncodableValue,
stream::{StreamFrame, STREAM_REGISTRY},
ActorContext,
};
use reflow_actor_macro::actor;
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
#[actor(
RenderFrameCollectorActor,
inports::<100>(frame, frame_number, done),
outports::<50>(stream, progress, error),
state(MemoryState)
)]
pub async fn render_frame_collector_actor(
ctx: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let payload = ctx.get_payload();
let config = ctx.get_config_hashmap();
let log_progress = config
.get("logProgress")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let total_frames = config
.get("totalFrames")
.and_then(|v| v.as_u64())
.unwrap_or(0) as usize;
let width = config.get("width").and_then(|v| v.as_u64()).unwrap_or(512) as u32;
let height = config.get("height").and_then(|v| v.as_u64()).unwrap_or(512) as u32;
let fps = config.get("fps").and_then(|v| v.as_u64()).unwrap_or(30) as u32;
let is_done = match payload.get("done") {
Some(Message::Boolean(b)) => *b,
Some(Message::Integer(i)) => *i != 0,
Some(Message::Float(f)) => *f != 0.0,
Some(Message::Flow) => true,
None => false,
_ => false,
};
let frame_bytes = match payload.get("frame") {
Some(Message::Bytes(b)) => {
let expected = (width * height * 4) as usize;
if b.len() == expected {
Some(b.clone())
} else {
None }
}
_ => None,
};
if frame_bytes.is_none() && !is_done {
return Ok(HashMap::new()); }
let state: HashMap<String, serde_json::Value> =
ctx.get_pool("_collector").into_iter().collect();
let existing_stream_id = state.get("stream_id").and_then(|v| v.as_u64());
if frame_bytes.is_none() && is_done {
if let Some(stream_id) = existing_stream_id {
if let Some(tx) = STREAM_REGISTRY.clone_sender(stream_id) {
let _ = tx.send(StreamFrame::End);
}
}
return Ok(HashMap::new());
}
let frame_bytes = frame_bytes.unwrap();
let mut results = HashMap::new();
if let Some(stream_id) = existing_stream_id {
if let Some(tx) = STREAM_REGISTRY.clone_sender(stream_id) {
let _ = tx.send(StreamFrame::Data(Arc::new(frame_bytes.to_vec())));
let count = state
.get("frame_count")
.and_then(|v| v.as_u64())
.unwrap_or(0)
+ 1;
ctx.pool_upsert("_collector", "frame_count", json!(count));
if is_done || (total_frames > 0 && count as usize + 1 >= total_frames) {
let _ = tx.send(StreamFrame::End);
}
results.insert(
"progress".to_string(),
Message::object(EncodableValue::from(json!({
"frame": count,
"totalFrames": total_frames,
}))),
);
if log_progress && (count <= 3 || count % 6 == 0 || count as usize >= total_frames) {
eprintln!("[collector] frame {}/{}", count, total_frames);
}
}
} else {
let (tx, handle) = ctx.create_stream(
"stream",
Some("video/raw-rgba".to_string()),
Some((total_frames as u64) * (width as u64) * (height as u64) * 4),
None, );
ctx.pool_upsert("_collector", "stream_id", json!(handle.stream_id));
ctx.pool_upsert("_collector", "frame_count", json!(1u64));
let _ = tx.send(StreamFrame::Begin {
content_type: Some("video/raw-rgba".to_string()),
size_hint: None,
metadata: Some(json!({
"width": width,
"height": height,
"fps": fps,
"totalFrames": total_frames,
})),
});
let _ = tx.send(StreamFrame::Data(Arc::new(frame_bytes.to_vec())));
if is_done || (total_frames == 1) {
let _ = tx.send(StreamFrame::End);
}
results.insert("stream".to_string(), Message::stream_handle(handle));
if log_progress {
eprintln!("[collector] frame 1/{}", total_frames);
}
}
Ok(results)
}