mjpeg_rs/
lib.rs

1use std::{
2    error::Error,
3    io::Write,
4    net::{TcpListener, ToSocketAddrs},
5    sync::{
6        mpsc::{sync_channel, Receiver, SyncSender},
7        Arc, Mutex,
8    },
9    thread,
10};
11
12pub struct MJpeg {
13    send: SyncSender<Vec<u8>>,
14    recv: Arc<Mutex<Receiver<Vec<u8>>>>,
15}
16
17impl MJpeg {
18    /// 创建一个mjpeg推流器
19    /// # example
20    /// ```
21    /// let m = Arc::new(MJpeg::new());
22    /// ```
23    pub fn new() -> Self {
24        let (send, recv) = sync_channel::<Vec<u8>>(0);
25        let recv = Arc::new(Mutex::new(recv));
26        Self { send, recv }
27    }
28
29    /// 将流推送到mjpeg
30    /// # example
31    /// ```
32    /// let m = Arc::new(MJpeg::new());
33    /// let mrc = m.clone();
34    /// thread::spawn(move || mrc.run("0.0.0.0:8088").unwrap());
35    /// loop {
36    ///     let b = camera.take_one().unwrap();
37    ///     m.update_jpeg(b).unwrap();
38    /// }
39    /// ```
40    pub fn update_jpeg(&self, buf: Vec<u8>) -> Result<(), Box<dyn Error>> {
41        self.send.send(buf)?;
42        Ok(())
43    }
44
45    /// 设置mjpeg服务端口
46    /// # example
47    /// ```
48    /// let m = Arc::new(MJpeg::new());
49    /// let mrc = m.clone();
50    /// // 此mjpeg-server将运行在8088端口
51    /// thread::spawn(move || mrc.run("0.0.0.0:8088").unwrap());
52    /// loop {
53    ///     let b = camera.take_one().unwrap();
54    ///     m.update_jpeg(b).unwrap();
55    /// }
56    /// ```
57    pub fn run<A: ToSocketAddrs>(&self, addr: A) -> Result<(), Box<dyn Error>> {
58        let server = TcpListener::bind(addr)?;
59        for stream in server.incoming() {
60            let recv = self.recv.clone();
61            thread::spawn(move || match stream {
62                Ok(stream) => {
63                    let mut stream = stream;
64                    stream.write(b"HTTP/1.1 200 OK\r\nContent-Type: multipart/x-mixed-replace;boundary=MJPEGBOUNDARY\r\n").unwrap();
65                    stream.flush().unwrap();
66                    loop {
67                        match recv.lock().map(|buf| buf.recv()) {
68                            Ok(buf) => match buf {
69                                Ok(mut buf) => {
70                                    let header = format!("\r\n--MJPEGBOUNDARY\r\nContent-Type: image/jpeg\r\nContent-Length: {}\r\nX-Timestamp: 0.000000\r\n\r\n",buf.len());
71                                    let header = header.as_bytes();
72                                    let mut header = header.to_vec();
73                                    header.append(&mut buf);
74                                    stream.write(&mut header).unwrap();
75                                    stream.flush().unwrap();
76                                }
77                                Err(e) => {
78                                    println!("recv err{}", e)
79                                }
80                            },
81                            Err(e) => {
82                                println!("lock err{}", e)
83                            }
84                        };
85                    }
86                }
87                Err(e) => {
88                    println!("stream err{}", e)
89                }
90            });
91        }
92        Ok(())
93    }
94}