mjpeg_rs/lib.rs
1use std::{
2 error::Error,
3 io::Write,
4 net::{TcpListener, ToSocketAddrs},
5 sync::{
6 mpsc::{sync_channel, Receiver, SyncSender},
7 Arc, Mutex,
8 },
9 thread,
10};
11
12pub struct MJpeg {
13 send: SyncSender<Vec<u8>>,
14 recv: Arc<Mutex<Receiver<Vec<u8>>>>,
15}
16
17impl MJpeg {
18 /// 创建一个mjpeg推流器
19 /// # example
20 /// ```
21 /// let m = Arc::new(MJpeg::new());
22 /// ```
23 pub fn new() -> Self {
24 let (send, recv) = sync_channel::<Vec<u8>>(0);
25 let recv = Arc::new(Mutex::new(recv));
26 Self { send, recv }
27 }
28
29 /// 将流推送到mjpeg
30 /// # example
31 /// ```
32 /// let m = Arc::new(MJpeg::new());
33 /// let mrc = m.clone();
34 /// thread::spawn(move || mrc.run("0.0.0.0:8088").unwrap());
35 /// loop {
36 /// let b = camera.take_one().unwrap();
37 /// m.update_jpeg(b).unwrap();
38 /// }
39 /// ```
40 pub fn update_jpeg(&self, buf: Vec<u8>) -> Result<(), Box<dyn Error>> {
41 self.send.send(buf)?;
42 Ok(())
43 }
44
45 /// 设置mjpeg服务端口
46 /// # example
47 /// ```
48 /// let m = Arc::new(MJpeg::new());
49 /// let mrc = m.clone();
50 /// // 此mjpeg-server将运行在8088端口
51 /// thread::spawn(move || mrc.run("0.0.0.0:8088").unwrap());
52 /// loop {
53 /// let b = camera.take_one().unwrap();
54 /// m.update_jpeg(b).unwrap();
55 /// }
56 /// ```
57 pub fn run<A: ToSocketAddrs>(&self, addr: A) -> Result<(), Box<dyn Error>> {
58 let server = TcpListener::bind(addr)?;
59 for stream in server.incoming() {
60 let recv = self.recv.clone();
61 thread::spawn(move || match stream {
62 Ok(stream) => {
63 let mut stream = stream;
64 stream.write(b"HTTP/1.1 200 OK\r\nContent-Type: multipart/x-mixed-replace;boundary=MJPEGBOUNDARY\r\n").unwrap();
65 stream.flush().unwrap();
66 loop {
67 match recv.lock().map(|buf| buf.recv()) {
68 Ok(buf) => match buf {
69 Ok(mut buf) => {
70 let header = format!("\r\n--MJPEGBOUNDARY\r\nContent-Type: image/jpeg\r\nContent-Length: {}\r\nX-Timestamp: 0.000000\r\n\r\n",buf.len());
71 let header = header.as_bytes();
72 let mut header = header.to_vec();
73 header.append(&mut buf);
74 stream.write(&mut header).unwrap();
75 stream.flush().unwrap();
76 }
77 Err(e) => {
78 println!("recv err{}", e)
79 }
80 },
81 Err(e) => {
82 println!("lock err{}", e)
83 }
84 };
85 }
86 }
87 Err(e) => {
88 println!("stream err{}", e)
89 }
90 });
91 }
92 Ok(())
93 }
94}