atm0s_media_server_record/
lib.rs1use std::collections::{HashMap, VecDeque};
2
3use media_server_protocol::{
4 protobuf::cluster_connector::{RecordReq, RecordRes},
5 record::SessionRecordEvent,
6};
7use session::SessionRecord;
8use storage::{memory::MemoryFile, FileId, RecordFile};
9use tokio::sync::mpsc::Sender;
10use worker::UploadWorker;
11
12#[cfg(feature = "convert_record")]
13pub mod convert;
14mod raw_record;
15mod session;
16mod storage;
17mod worker;
18
19pub use raw_record::*;
20pub use storage::convert_s3_uri;
21
22pub struct MediaRecordStats {
24 pub pending_chunks: usize,
25 pub pending_bytes: usize,
26 pub uploading_chunks: usize,
27 pub uploading_bytes: usize,
28 pub uploaded_chunks: usize,
29 pub uploaded_bytes: usize,
30}
31
32pub enum Input {
33 Event(u64, u64, SessionRecordEvent),
34 UploadResponse(u64, RecordRes),
35}
36
37pub enum Output {
38 Stats(MediaRecordStats),
39 UploadRequest(u64, RecordReq),
40}
41
42pub struct MediaRecordService {
43 req_id_seed: u64,
44 queue: VecDeque<Output>,
45 chunk_map: HashMap<u64, FileId>,
46 sessions: HashMap<u64, SessionRecord>,
47 worker_tx: Sender<worker::Input>,
48}
49
50impl MediaRecordService {
51 pub fn new(workers: usize, path: &str, max_mem_size: usize) -> Self {
52 let (mut worker, worker_tx) = UploadWorker::new(path, max_mem_size);
53 for i in 0..workers {
54 worker.start_child_worker(i);
55 }
56
57 tokio::spawn(async move {
58 loop {
59 if let Err(e) = worker.recv().await {
60 log::error!("worker error {e}");
61 }
62 }
63 });
64
65 Self {
66 req_id_seed: 0,
67 queue: VecDeque::new(),
68 sessions: HashMap::new(),
69 chunk_map: HashMap::new(),
70 worker_tx,
71 }
72 }
73
74 pub fn on_tick(&mut self, now: u64) {
75 for (_id, session) in self.sessions.iter_mut() {
76 if let Some((req, file)) = session.tick(now) {
77 Self::process_chunk(&mut self.req_id_seed, req, file, &mut self.queue, &mut self.chunk_map, &self.worker_tx);
78 }
79 }
80 self.sessions.retain(|_, session| !session.is_closed());
81 }
82
83 pub fn on_input(&mut self, now: u64, event: Input) {
84 match event {
85 Input::Event(session, ts, event) => self.on_record_event(now, session, ts, event),
86 Input::UploadResponse(req_id, res) => {
87 log::info!("[MediaWorkerService] received res for req {req_id}");
88 if let Some(file_id) = self.chunk_map.remove(&req_id) {
89 if let Err(e) = self.worker_tx.try_send(worker::Input::UploadLink(file_id, res.s3_uri)) {
90 log::error!("[MediaWorkerService] send record link to record controller worker error {e}");
91 }
92 }
93 }
94 }
95 }
96
97 pub fn pop_output(&mut self) -> Option<Output> {
98 self.queue.pop_front()
99 }
100}
101
102impl MediaRecordService {
103 fn on_record_event(&mut self, now_ms: u64, session: u64, event_ts: u64, event: SessionRecordEvent) {
104 let session = self.sessions.entry(session).or_insert_with(|| SessionRecord::new(session));
105 if let Some((req, file)) = session.push(now_ms, event_ts, event) {
106 Self::process_chunk(&mut self.req_id_seed, req, file, &mut self.queue, &mut self.chunk_map, &self.worker_tx);
107 }
108 }
109
110 fn process_chunk(req_seed: &mut u64, req: RecordReq, file: MemoryFile, queue: &mut VecDeque<Output>, chunk_map: &mut HashMap<u64, FileId>, worker_tx: &Sender<worker::Input>) {
111 let req_id = *req_seed;
112 *req_seed += 1;
113 log::info!("[MediaWorkerService] request upload uri with req_id {req_id}");
114 queue.push_back(Output::UploadRequest(req_id, req));
115 chunk_map.insert(req_id, file.id());
116 if let Err(e) = worker_tx.try_send(worker::Input::RecordChunk(file)) {
117 log::error!("[MediaWorkerService] send record chunk to record controller worker error {e}");
118 }
119 }
120}