1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
use std::{
    error::Error,
    io::Write,
    net::{TcpListener, ToSocketAddrs},
    sync::{
        mpsc::{sync_channel, Receiver, SyncSender},
        Arc, Mutex,
    },
    thread,
};

pub struct MJpeg {
    send: SyncSender<Vec<u8>>,
    recv: Arc<Mutex<Receiver<Vec<u8>>>>,
}

impl MJpeg {
    /// 创建一个mjpeg推流器
    /// # example
    /// ```
    /// let m = Arc::new(MJpeg::new());
    /// ```
    pub fn new() -> Self {
        let (send, recv) = sync_channel::<Vec<u8>>(0);
        let recv = Arc::new(Mutex::new(recv));
        Self { send, recv }
    }

    /// 将流推送到mjpeg
    /// # example
    /// ```
    /// let m = Arc::new(MJpeg::new());
    /// let mrc = m.clone();
    /// thread::spawn(move || mrc.run("0.0.0.0:8088").unwrap());
    /// loop {
    ///     let b = camera.take_one().unwrap();
    ///     m.update_jpeg(b).unwrap();
    /// }
    /// ```
    pub fn update_jpeg(&self, buf: Vec<u8>) -> Result<(), Box<dyn Error>> {
        self.send.send(buf)?;
        Ok(())
    }

    /// 设置mjpeg服务端口
    /// # example
    /// ```
    /// let m = Arc::new(MJpeg::new());
    /// let mrc = m.clone();
    /// // 此mjpeg-server将运行在8088端口
    /// thread::spawn(move || mrc.run("0.0.0.0:8088").unwrap());
    /// loop {
    ///     let b = camera.take_one().unwrap();
    ///     m.update_jpeg(b).unwrap();
    /// }
    /// ```
    pub fn run<A: ToSocketAddrs>(&self, addr: A) -> Result<(), Box<dyn Error>> {
        let server = TcpListener::bind(addr)?;
        for stream in server.incoming() {
            let recv = self.recv.clone();
            thread::spawn(move || match stream {
                Ok(stream) => {
                    let mut stream = stream;
                    stream.write(b"HTTP/1.1 200 OK\r\nContent-Type: multipart/x-mixed-replace;boundary=MJPEGBOUNDARY\r\n").unwrap();
                    stream.flush().unwrap();
                    loop {
                        match recv.lock().map(|buf| buf.recv()) {
                            Ok(buf) => match buf {
                                Ok(mut buf) => {
                                    let header = format!("\r\n--MJPEGBOUNDARY\r\nContent-Type: image/jpeg\r\nContent-Length: {}\r\nX-Timestamp: 0.000000\r\n\r\n",buf.len());
                                    let header = header.as_bytes();
                                    let mut header = header.to_vec();
                                    header.append(&mut buf);
                                    stream.write(&mut header).unwrap();
                                    stream.flush().unwrap();
                                }
                                Err(e) => {
                                    println!("recv err{}", e)
                                }
                            },
                            Err(e) => {
                                println!("lock err{}", e)
                            }
                        };
                    }
                }
                Err(e) => {
                    println!("stream err{}", e)
                }
            });
        }
        Ok(())
    }
}