1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
use std::{
error::Error,
io::Write,
net::{TcpListener, ToSocketAddrs},
sync::{
mpsc::{sync_channel, Receiver, SyncSender},
Arc, Mutex,
},
thread,
};
pub struct MJpeg {
send: SyncSender<Vec<u8>>,
recv: Arc<Mutex<Receiver<Vec<u8>>>>,
}
impl MJpeg {
/// 创建一个mjpeg推流器
/// # example
/// ```
/// let m = Arc::new(MJpeg::new());
/// ```
pub fn new() -> Self {
let (send, recv) = sync_channel::<Vec<u8>>(0);
let recv = Arc::new(Mutex::new(recv));
Self { send, recv }
}
/// 将流推送到mjpeg
/// # example
/// ```
/// let m = Arc::new(MJpeg::new());
/// let mrc = m.clone();
/// thread::spawn(move || mrc.run("0.0.0.0:8088").unwrap());
/// loop {
/// let b = camera.take_one().unwrap();
/// m.update_jpeg(b).unwrap();
/// }
/// ```
pub fn update_jpeg(&self, buf: Vec<u8>) -> Result<(), Box<dyn Error>> {
self.send.send(buf)?;
Ok(())
}
/// 设置mjpeg服务端口
/// # example
/// ```
/// let m = Arc::new(MJpeg::new());
/// let mrc = m.clone();
/// // 此mjpeg-server将运行在8088端口
/// thread::spawn(move || mrc.run("0.0.0.0:8088").unwrap());
/// loop {
/// let b = camera.take_one().unwrap();
/// m.update_jpeg(b).unwrap();
/// }
/// ```
pub fn run<A: ToSocketAddrs>(&self, addr: A) -> Result<(), Box<dyn Error>> {
let server = TcpListener::bind(addr)?;
for stream in server.incoming() {
let recv = self.recv.clone();
thread::spawn(move || match stream {
Ok(stream) => {
let mut stream = stream;
stream.write(b"HTTP/1.1 200 OK\r\nContent-Type: multipart/x-mixed-replace;boundary=MJPEGBOUNDARY\r\n").unwrap();
stream.flush().unwrap();
loop {
match recv.lock().map(|buf| buf.recv()) {
Ok(buf) => match buf {
Ok(mut buf) => {
let header = format!("\r\n--MJPEGBOUNDARY\r\nContent-Type: image/jpeg\r\nContent-Length: {}\r\nX-Timestamp: 0.000000\r\n\r\n",buf.len());
let header = header.as_bytes();
let mut header = header.to_vec();
header.append(&mut buf);
stream.write(&mut header).unwrap();
stream.flush().unwrap();
}
Err(e) => {
println!("recv err{}", e)
}
},
Err(e) => {
println!("lock err{}", e)
}
};
}
}
Err(e) => {
println!("stream err{}", e)
}
});
}
Ok(())
}
}