atm0s_media_server_record/
lib.rs

1use std::collections::{HashMap, VecDeque};
2
3use media_server_protocol::{
4    protobuf::cluster_connector::{RecordReq, RecordRes},
5    record::SessionRecordEvent,
6};
7use session::SessionRecord;
8use storage::{memory::MemoryFile, FileId, RecordFile};
9use tokio::sync::mpsc::Sender;
10use worker::UploadWorker;
11
12#[cfg(feature = "convert_record")]
13pub mod convert;
14mod raw_record;
15mod session;
16mod storage;
17mod worker;
18
19pub use raw_record::*;
20pub use storage::convert_s3_uri;
21
22//TODO: generate MediaRecordStats
23pub struct MediaRecordStats {
24    pub pending_chunks: usize,
25    pub pending_bytes: usize,
26    pub uploading_chunks: usize,
27    pub uploading_bytes: usize,
28    pub uploaded_chunks: usize,
29    pub uploaded_bytes: usize,
30}
31
32pub enum Input {
33    Event(u64, u64, SessionRecordEvent),
34    UploadResponse(u64, RecordRes),
35}
36
37pub enum Output {
38    Stats(MediaRecordStats),
39    UploadRequest(u64, RecordReq),
40}
41
42pub struct MediaRecordService {
43    req_id_seed: u64,
44    queue: VecDeque<Output>,
45    chunk_map: HashMap<u64, FileId>,
46    sessions: HashMap<u64, SessionRecord>,
47    worker_tx: Sender<worker::Input>,
48}
49
50impl MediaRecordService {
51    pub fn new(workers: usize, path: &str, max_mem_size: usize) -> Self {
52        let (mut worker, worker_tx) = UploadWorker::new(path, max_mem_size);
53        for i in 0..workers {
54            worker.start_child_worker(i);
55        }
56
57        tokio::spawn(async move {
58            loop {
59                if let Err(e) = worker.recv().await {
60                    log::error!("worker error {e}");
61                }
62            }
63        });
64
65        Self {
66            req_id_seed: 0,
67            queue: VecDeque::new(),
68            sessions: HashMap::new(),
69            chunk_map: HashMap::new(),
70            worker_tx,
71        }
72    }
73
74    pub fn on_tick(&mut self, now: u64) {
75        for (_id, session) in self.sessions.iter_mut() {
76            if let Some((req, file)) = session.tick(now) {
77                Self::process_chunk(&mut self.req_id_seed, req, file, &mut self.queue, &mut self.chunk_map, &self.worker_tx);
78            }
79        }
80        self.sessions.retain(|_, session| !session.is_closed());
81    }
82
83    pub fn on_input(&mut self, now: u64, event: Input) {
84        match event {
85            Input::Event(session, ts, event) => self.on_record_event(now, session, ts, event),
86            Input::UploadResponse(req_id, res) => {
87                log::info!("[MediaWorkerService] received res for req {req_id}");
88                if let Some(file_id) = self.chunk_map.remove(&req_id) {
89                    if let Err(e) = self.worker_tx.try_send(worker::Input::UploadLink(file_id, res.s3_uri)) {
90                        log::error!("[MediaWorkerService] send record link to record controller worker error {e}");
91                    }
92                }
93            }
94        }
95    }
96
97    pub fn pop_output(&mut self) -> Option<Output> {
98        self.queue.pop_front()
99    }
100}
101
102impl MediaRecordService {
103    fn on_record_event(&mut self, now_ms: u64, session: u64, event_ts: u64, event: SessionRecordEvent) {
104        let session = self.sessions.entry(session).or_insert_with(|| SessionRecord::new(session));
105        if let Some((req, file)) = session.push(now_ms, event_ts, event) {
106            Self::process_chunk(&mut self.req_id_seed, req, file, &mut self.queue, &mut self.chunk_map, &self.worker_tx);
107        }
108    }
109
110    fn process_chunk(req_seed: &mut u64, req: RecordReq, file: MemoryFile, queue: &mut VecDeque<Output>, chunk_map: &mut HashMap<u64, FileId>, worker_tx: &Sender<worker::Input>) {
111        let req_id = *req_seed;
112        *req_seed += 1;
113        log::info!("[MediaWorkerService] request upload uri with req_id {req_id}");
114        queue.push_back(Output::UploadRequest(req_id, req));
115        chunk_map.insert(req_id, file.id());
116        if let Err(e) = worker_tx.try_send(worker::Input::RecordChunk(file)) {
117            log::error!("[MediaWorkerService] send record chunk to record controller worker error {e}");
118        }
119    }
120}