atm0s_media_server_gateway/
store_service.rs

1use std::{collections::VecDeque, fmt::Debug};
2
3use atm0s_sdn::{
4    base::{
5        NetIncomingMeta, NetOutgoingMeta, Service, ServiceBuilder, ServiceCtx, ServiceInput, ServiceOutput, ServiceSharedInput, ServiceWorker, ServiceWorkerCtx, ServiceWorkerInput,
6        ServiceWorkerOutput,
7    },
8    features::{data, FeaturesControl, FeaturesEvent},
9    NodeId, RouteRule, ServiceBroadcastLevel,
10};
11use media_server_protocol::{
12    cluster::ZoneId,
13    protobuf::{
14        self,
15        cluster_gateway::{gateway_event, ping_event::gateway_origin::Location},
16    },
17};
18use prost::Message as _;
19
20use crate::{
21    store::{GatewayStore, PingEvent},
22    NodeMetrics, ServiceKind, DATA_PORT, STORE_SERVICE_ID, STORE_SERVICE_NAME,
23};
24
25#[derive(Debug, Clone)]
26pub enum Control {
27    NodeStats(NodeMetrics),
28    FindNodeReq(u64, ServiceKind, Option<Location>),
29    FindDestReq(u64, ServiceKind, NodeId),
30    GetMediaStats,
31}
32
33#[derive(Debug, Clone)]
34pub enum Event {
35    MediaStats(u32, u32),
36    FindNodeRes(u64, Option<u32>),
37    FindDestRes(u64, Option<u32>),
38}
39
40pub struct GatewayStoreService<UserData, SC, SE, TC, TW> {
41    queue: VecDeque<ServiceOutput<UserData, FeaturesControl, SE, TW>>,
42    store: GatewayStore,
43    seq: u16,
44    shutdown: bool,
45    _tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>,
46}
47
48impl<UserData: Copy, SC, SE, TC, TW> GatewayStoreService<UserData, SC, SE, TC, TW>
49where
50    SC: From<Control> + TryInto<Control>,
51    SE: From<Event> + TryInto<Event>,
52{
53    pub fn new(zone: ZoneId, lat: f32, lon: f32, max_cpu: u8, max_memory: u8, max_disk: u8) -> Self {
54        Self {
55            store: GatewayStore::new(zone, Location { lat, lon }, max_cpu, max_memory, max_disk),
56            queue: VecDeque::from([ServiceOutput::FeatureControl(data::Control::DataListen(DATA_PORT).into())]),
57            seq: 0,
58            shutdown: false,
59            _tmp: std::marker::PhantomData,
60        }
61    }
62
63    fn handle_event(&mut self, now: u64, _port: u16, meta: NetIncomingMeta, data: &[u8]) -> Option<()> {
64        let from = meta.source?;
65        let req = protobuf::cluster_gateway::GatewayEvent::decode(data).ok()?;
66        let event = req.event?;
67        match event {
68            gateway_event::Event::Ping(ping) => {
69                let origin = ping.origin?;
70                self.store.on_ping(
71                    now,
72                    from,
73                    PingEvent {
74                        cpu: ping.cpu as u8,
75                        memory: ping.memory as u8,
76                        disk: ping.disk as u8,
77                        origin,
78                        webrtc: ping.webrtc,
79                        rtpengine: ping.rtpengine,
80                    },
81                )
82            }
83        }
84
85        Some(())
86    }
87}
88
89impl<UserData: Copy + Eq + Debug, SC, SE, TC, TW> Service<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW> for GatewayStoreService<UserData, SC, SE, TC, TW>
90where
91    SC: From<Control> + TryInto<Control> + Debug,
92    SE: From<Event> + TryInto<Event>,
93    TC: Debug,
94{
95    fn service_id(&self) -> u8 {
96        STORE_SERVICE_ID
97    }
98
99    fn service_name(&self) -> &str {
100        STORE_SERVICE_NAME
101    }
102
103    fn is_service_empty(&self) -> bool {
104        self.shutdown && self.queue.is_empty()
105    }
106
107    fn on_shared_input<'a>(&mut self, _ctx: &ServiceCtx, now: u64, input: ServiceSharedInput) {
108        match input {
109            ServiceSharedInput::Tick(_) => {
110                self.store.on_tick(now);
111                if let Some(ping) = self.store.pop_output() {
112                    let rule = RouteRule::ToServices(STORE_SERVICE_ID, ServiceBroadcastLevel::Global, self.seq);
113                    self.seq += 1;
114                    let mut meta = NetOutgoingMeta::secure();
115                    meta.source = true;
116                    let data = protobuf::cluster_gateway::GatewayEvent {
117                        event: Some(gateway_event::Event::Ping(protobuf::cluster_gateway::PingEvent {
118                            cpu: ping.cpu as u32,
119                            memory: ping.memory as u32,
120                            disk: ping.disk as u32,
121                            webrtc: ping.webrtc,
122                            rtpengine: ping.rtpengine,
123                            origin: Some(ping.origin),
124                        })),
125                    }
126                    .encode_to_vec();
127                    self.queue.push_back(ServiceOutput::FeatureControl(data::Control::DataSendRule(DATA_PORT, rule, meta, data).into()));
128                }
129            }
130            ServiceSharedInput::Connection(_) => {}
131        }
132    }
133
134    fn on_input(&mut self, _ctx: &ServiceCtx, now: u64, input: ServiceInput<UserData, FeaturesEvent, SC, TC>) {
135        match input {
136            ServiceInput::FeatureEvent(FeaturesEvent::Data(data::Event::Recv(port, meta, data))) => {
137                self.handle_event(now, port, meta, &data);
138            }
139            ServiceInput::Control(actor, control) => {
140                if let Ok(control) = control.try_into() {
141                    match control {
142                        Control::FindNodeReq(req_id, kind, location) => {
143                            let out = self.store.best_for(kind, location);
144                            self.queue.push_back(ServiceOutput::Event(actor, Event::FindNodeRes(req_id, out).into()));
145                        }
146                        Control::FindDestReq(req_id, kind, dest) => {
147                            let out = self.store.dest_for(kind, dest);
148                            self.queue.push_back(ServiceOutput::Event(actor, Event::FindDestRes(req_id, out).into()));
149                        }
150                        Control::NodeStats(metrics) => {
151                            log::debug!("[GatewayStoreService] node metrics {:?}", metrics);
152                            self.store.on_node_metrics(now, metrics);
153                        }
154                        Control::GetMediaStats => {
155                            if let Some(stats) = self.store.local_stats() {
156                                self.queue.push_back(ServiceOutput::Event(actor, Event::MediaStats(stats.live, stats.max).into()));
157                            }
158                        }
159                    }
160                }
161            }
162            _ => {}
163        }
164    }
165
166    fn on_shutdown(&mut self, _ctx: &ServiceCtx, _now: u64) {
167        self.shutdown = true;
168    }
169
170    fn pop_output2(&mut self, _now: u64) -> Option<ServiceOutput<UserData, FeaturesControl, SE, TW>> {
171        self.queue.pop_front()
172    }
173}
174
175pub struct GatewayStoreServiceWorker<UserData, SC, SE, TC> {
176    queue: VecDeque<ServiceWorkerOutput<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC>>,
177    shutdown: bool,
178}
179
180impl<UserData, SC, SE, TC, TW> ServiceWorker<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW> for GatewayStoreServiceWorker<UserData, SC, SE, TC> {
181    fn service_id(&self) -> u8 {
182        STORE_SERVICE_ID
183    }
184
185    fn service_name(&self) -> &str {
186        STORE_SERVICE_NAME
187    }
188
189    fn is_service_empty(&self) -> bool {
190        self.shutdown && self.queue.is_empty()
191    }
192
193    fn on_tick(&mut self, _ctx: &ServiceWorkerCtx, _now: u64, _tick_count: u64) {}
194
195    fn on_input(&mut self, _ctx: &ServiceWorkerCtx, _now: u64, input: ServiceWorkerInput<UserData, FeaturesEvent, SC, TW>) {
196        match input {
197            ServiceWorkerInput::Control(owner, control) => self.queue.push_back(ServiceWorkerOutput::ForwardControlToController(owner, control)),
198            ServiceWorkerInput::FromController(_) => {}
199            ServiceWorkerInput::FeatureEvent(event) => {
200                log::info!("forward event to controller");
201                self.queue.push_back(ServiceWorkerOutput::ForwardFeatureEventToController(event))
202            }
203        }
204    }
205
206    fn on_shutdown(&mut self, _ctx: &ServiceWorkerCtx, _now: u64) {
207        self.shutdown = true;
208    }
209
210    fn pop_output2(&mut self, _now: u64) -> Option<ServiceWorkerOutput<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC>> {
211        self.queue.pop_front()
212    }
213}
214
215pub struct GatewayStoreServiceBuilder<UserData, SC, SE, TC, TW> {
216    _tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>,
217    zone: ZoneId,
218    lat: f32,
219    lon: f32,
220    max_memory: u8,
221    max_disk: u8,
222    max_cpu: u8,
223}
224
225impl<UserData, SC, SE, TC, TW> GatewayStoreServiceBuilder<UserData, SC, SE, TC, TW> {
226    pub fn new(zone: ZoneId, lat: f32, lon: f32, max_cpu: u8, max_memory: u8, max_disk: u8) -> Self {
227        Self {
228            zone,
229            lat,
230            lon,
231            _tmp: std::marker::PhantomData,
232            max_cpu,
233            max_memory,
234            max_disk,
235        }
236    }
237}
238
239impl<UserData, SC, SE, TC, TW> ServiceBuilder<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW> for GatewayStoreServiceBuilder<UserData, SC, SE, TC, TW>
240where
241    UserData: 'static + Debug + Send + Sync + Copy + Eq,
242    SC: 'static + Debug + Send + Sync + From<Control> + TryInto<Control>,
243    SE: 'static + Debug + Send + Sync + From<Event> + TryInto<Event>,
244    TC: 'static + Debug + Send + Sync,
245    TW: 'static + Debug + Send + Sync,
246{
247    fn service_id(&self) -> u8 {
248        STORE_SERVICE_ID
249    }
250
251    fn service_name(&self) -> &str {
252        STORE_SERVICE_NAME
253    }
254
255    fn discoverable(&self) -> bool {
256        true
257    }
258
259    fn create(&self) -> Box<dyn Service<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW>> {
260        Box::new(GatewayStoreService::new(self.zone, self.lat, self.lon, self.max_cpu, self.max_memory, self.max_disk))
261    }
262
263    fn create_worker(&self) -> Box<dyn ServiceWorker<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW>> {
264        Box::new(GatewayStoreServiceWorker {
265            queue: Default::default(),
266            shutdown: false,
267        })
268    }
269}