use async_stream::try_stream;
use axum::{
body::{Body, Bytes},
extract::State,
http::{header, Response},
response::{Html, IntoResponse},
routing::get,
Router,
};
use bambu::{CameraClient, CameraPacket};
use futures_core::Stream;
use futures_util::StreamExt;
use std::{convert::Infallible, net::SocketAddr, sync::Arc};
use tokio::{
sync::{
broadcast::{self, Receiver},
RwLock,
},
task,
};
use turbojpeg::DecompressHeader;
async fn read_jpeg_header(jpeg_frame_bytes: Bytes) -> turbojpeg::Result<DecompressHeader> {
task::spawn_blocking(move || turbojpeg::read_header(&jpeg_frame_bytes))
.await
.unwrap()
}
pub(crate) struct Config {
pub(crate) printer_ip: String,
pub(crate) access_code: String,
#[allow(dead_code)]
pub(crate) serial_number: String,
pub(crate) camera_port: u16,
}
impl Config {
pub(crate) fn from_env() -> Self {
let printer_ip = std::env::var("BAMBU_IP").expect("BAMBU_IP must be set");
let access_code =
std::env::var("BAMBU_ACCESS_CODE").expect("BAMBU_ACCESS_CODE must be set");
let serial_number =
std::env::var("BAMBU_SERIAL_NUMBER").expect("BAMBU_SERIAL_NUMBER must be set");
let camera_port: u16 = std::env::var("BAMBU_PORT")
.unwrap_or_else(|_| "6000".to_string())
.parse()
.expect("BAMBU_PORT must be a valid u16");
Self {
printer_ip,
access_code,
serial_number,
camera_port,
}
}
}
const BOUNDARY: &str = "donotcrossboundary";
#[derive(Clone)]
struct AppState {
tx: broadcast::Sender<Bytes>,
last_frame: Arc<RwLock<Option<Bytes>>>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (tx, _rx) = broadcast::channel(16);
let config = Arc::new(Config::from_env());
let last_frame = Arc::new(RwLock::new(None));
tokio::spawn({
let tx = tx.clone();
let last_frame = last_frame.clone();
let config = config.clone();
async move {
let client =
CameraClient::new(&config.printer_ip, &config.access_code, config.camera_port);
let mut frame_stream = match client.connect_and_stream_codec().await {
Ok(stream) => stream,
Err(e) => {
eprintln!("Error connecting to camera: {}", e);
return;
}
};
while let Some(jpeg_frame_bytes) = frame_stream.next().await {
match jpeg_frame_bytes {
Ok(CameraPacket::Auth { .. }) => {
unreachable!("Auth packet should not be received here");
}
Ok(CameraPacket::Jpeg(jpeg_frame_bytes)) => {
println!("Received a JPEG frame of length {}", jpeg_frame_bytes.len());
let jpeg_header = match read_jpeg_header(jpeg_frame_bytes.clone()).await {
Ok(img) => img,
Err(e) => {
eprintln!("Error decoding image: {}", e);
continue;
}
};
println!(
"Image dimensions: {}x{}",
jpeg_header.width, jpeg_header.height
);
{
let mut last_frame = last_frame.write().await;
*last_frame = Some(jpeg_frame_bytes.clone());
}
if tx.send(jpeg_frame_bytes).is_err() {
eprintln!("Error sending frame to broadcast channel");
break;
}
}
Err(e) => eprintln!("Error receiving frame: {}", e),
}
}
println!("Camera connection closed");
}
});
let app_state = Arc::new(AppState { tx, last_frame });
let app = Router::new()
.route("/stream/live.mjpeg", get(mjpeg_live_stream))
.route(
"/",
get(|| async { Html(r#"<img src="/stream/live.mjpeg"/>"#) }),
)
.with_state(app_state);
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
println!("Serving content on http://{}", addr);
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
Ok(())
}
fn mjpeg_stream(
state: Arc<AppState>,
mut rx: Receiver<Bytes>,
) -> impl Stream<Item = Result<Bytes, Infallible>> {
try_stream! {
if let Some(frame) = state.last_frame.read().await.as_ref() {
let header = format!(
"--{BOUNDARY}\r\nContent-Type: image/jpeg\r\nContent-Length: {}\r\n\r\n",
frame.len()
);
yield Bytes::from(header);
yield frame.clone();
yield Bytes::from_static(b"\r\n");
}
loop {
let frame_bytes = match rx.recv().await {
Ok(data) => data,
Err(_) => {
eprintln!("Error receiving frame from broadcast channel");
break
},
};
let header = format!(
"--{BOUNDARY}\r\nContent-Type: image/jpeg\r\nContent-Length: {}\r\n\r\n",
frame_bytes.len()
);
yield Bytes::from(header);
yield frame_bytes;
yield Bytes::from_static(b"\r\n");
}
}
}
async fn mjpeg_live_stream(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let rx = state.tx.subscribe();
Response::builder()
.header(
header::CONTENT_TYPE,
format!("multipart/x-mixed-replace; boundary={BOUNDARY}"),
)
.body(Body::from_stream(mjpeg_stream(Arc::clone(&state), rx)))
.unwrap()
}