1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use {
    super::StreamStatistics,
    std::{sync::Arc, time::Duration},
    tokio::{
        sync::{
            mpsc,
            mpsc::{Receiver, Sender},
            Mutex,
        },
        time,
    },
    xflv::{
        define,
        define::{aac_packet_type, AvcCodecId, SoundFormat},
        mpeg4_aac::Mpeg4Aac,
        mpeg4_avc::Mpeg4Avc,
    },
};

pub struct AvStatistics {
    /*used to calculate video bitrate */
    video_bytes: Arc<Mutex<f32>>,
    /*used to calculate audio bitrate */
    audio_bytes: Arc<Mutex<f32>>,
    //used to calculate frame rate
    frame_count: Arc<Mutex<usize>>,
    //used to calculate GOP
    gop_frame_count: Arc<Mutex<usize>>,
    stream_statistics: Arc<Mutex<StreamStatistics>>,
    pub sender: Sender<bool>,
}

impl AvStatistics {
    pub fn new(app_name: String, stream_name: String) -> Self {
        let (s, _): (Sender<bool>, Receiver<bool>) = mpsc::channel(1);
        Self {
            video_bytes: Arc::new(Mutex::new(0.0)),
            audio_bytes: Arc::new(Mutex::new(0.0)),
            frame_count: Arc::new(Mutex::new(0)),
            gop_frame_count: Arc::new(Mutex::new(0)),
            stream_statistics: Arc::new(Mutex::new(StreamStatistics::new(app_name, stream_name))),
            sender: s,
        }
    }

    pub async fn notify_audio_codec_info(&mut self, codec_info: &Mpeg4Aac) {
        let audio_info = &mut self.stream_statistics.lock().await.audio;
        audio_info.profile = define::u8_2_aac_profile(codec_info.profile);
        audio_info.samplerate = codec_info.sampling_frequency;
        audio_info.sound_format = SoundFormat::AAC;
        audio_info.channels = codec_info.channels;
    }

    pub async fn notify_video_codec_info(&mut self, codec_info: &Mpeg4Avc) {
        let video_info = &mut self.stream_statistics.lock().await.video;
        video_info.codec = AvcCodecId::H264;
        video_info.profile = define::u8_2_avc_profile(codec_info.profile);
        video_info.level = define::u8_2_avc_level(codec_info.level);
        video_info.height = codec_info.height;
        video_info.width = codec_info.width;
    }

    pub async fn notify_audio_statistics_info(&mut self, data_size: usize, aac_packet_type: u8) {
        match aac_packet_type {
            aac_packet_type::AAC_RAW => {
                *self.audio_bytes.lock().await += data_size as f32;
            }
            aac_packet_type::AAC_SEQHDR => {}
            _ => {}
        }
    }

    pub async fn notify_video_statistics_info(&mut self, data_size: usize, is_key_frame: bool) {
        *self.video_bytes.lock().await += data_size as f32;
        *self.frame_count.lock().await += 1;
        if is_key_frame {
            let video_info = &mut self.stream_statistics.lock().await.video;
            video_info.gop = *self.gop_frame_count.lock().await;
            *self.gop_frame_count.lock().await = 0;
        } else {
            *self.gop_frame_count.lock().await += 1;
        }
    }

    pub fn start(&mut self) {
        let mut interval = time::interval(Duration::from_secs(1));

        let video_bytes_clone = self.video_bytes.clone();
        let audio_bytes_clone = self.audio_bytes.clone();
        let frame_count_clone = self.frame_count.clone();
        let stream_statistics_clone = self.stream_statistics.clone();

        let (s, mut r): (Sender<bool>, Receiver<bool>) = mpsc::channel(1);
        self.sender = s;

        tokio::spawn(async move {
            loop {
                tokio::select! {
                   _ = interval.tick() => {
                    {
                        let stream_statistics = &mut stream_statistics_clone.lock().await;
                        let audio_info = &mut stream_statistics.audio;
                        audio_info.bitrate = *audio_bytes_clone.lock().await * 8.0/1000.0;

                        let video_info = &mut stream_statistics.video;
                        video_info.bitrate = *video_bytes_clone.lock().await * 8.0/1000.0;
                        video_info.frame_rate = *frame_count_clone.lock().await;
                    }
                    *video_bytes_clone.lock().await = 0.0;
                    *audio_bytes_clone.lock().await = 0.0;
                    *frame_count_clone.lock().await = 0;
                    // if let Ok(strinfo) = serde_json::to_string(&*stream_statistics_clone.lock().await) {
                    //    // log::info!("stream_info: {strinfo}");
                    // }
                },
                   _ = r.recv() =>{
                        log::info!("avstatistics shutting down");
                        return
                   },
                }
            }
        });
    }

    pub async fn get_avstatistic_data(&self) -> StreamStatistics {
        self.stream_statistics.lock().await.clone()
    }
}