1use std::{collections::VecDeque, fmt::Debug};
2
3use atm0s_sdn::{
4 base::{
5 NetIncomingMeta, NetOutgoingMeta, Service, ServiceBuilder, ServiceCtx, ServiceInput, ServiceOutput, ServiceSharedInput, ServiceWorker, ServiceWorkerCtx, ServiceWorkerInput,
6 ServiceWorkerOutput,
7 },
8 features::{data, FeaturesControl, FeaturesEvent},
9 NodeId, RouteRule, ServiceBroadcastLevel,
10};
11use media_server_protocol::{
12 cluster::ZoneId,
13 protobuf::{
14 self,
15 cluster_gateway::{gateway_event, ping_event::gateway_origin::Location},
16 },
17};
18use prost::Message as _;
19
20use crate::{
21 store::{GatewayStore, PingEvent},
22 NodeMetrics, ServiceKind, DATA_PORT, STORE_SERVICE_ID, STORE_SERVICE_NAME,
23};
24
25#[derive(Debug, Clone)]
26pub enum Control {
27 NodeStats(NodeMetrics),
28 FindNodeReq(u64, ServiceKind, Option<Location>),
29 FindDestReq(u64, ServiceKind, NodeId),
30 GetMediaStats,
31}
32
33#[derive(Debug, Clone)]
34pub enum Event {
35 MediaStats(u32, u32),
36 FindNodeRes(u64, Option<u32>),
37 FindDestRes(u64, Option<u32>),
38}
39
40pub struct GatewayStoreService<UserData, SC, SE, TC, TW> {
41 queue: VecDeque<ServiceOutput<UserData, FeaturesControl, SE, TW>>,
42 store: GatewayStore,
43 seq: u16,
44 shutdown: bool,
45 _tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>,
46}
47
48impl<UserData: Copy, SC, SE, TC, TW> GatewayStoreService<UserData, SC, SE, TC, TW>
49where
50 SC: From<Control> + TryInto<Control>,
51 SE: From<Event> + TryInto<Event>,
52{
53 pub fn new(zone: ZoneId, lat: f32, lon: f32, max_cpu: u8, max_memory: u8, max_disk: u8) -> Self {
54 Self {
55 store: GatewayStore::new(zone, Location { lat, lon }, max_cpu, max_memory, max_disk),
56 queue: VecDeque::from([ServiceOutput::FeatureControl(data::Control::DataListen(DATA_PORT).into())]),
57 seq: 0,
58 shutdown: false,
59 _tmp: std::marker::PhantomData,
60 }
61 }
62
63 fn handle_event(&mut self, now: u64, _port: u16, meta: NetIncomingMeta, data: &[u8]) -> Option<()> {
64 let from = meta.source?;
65 let req = protobuf::cluster_gateway::GatewayEvent::decode(data).ok()?;
66 let event = req.event?;
67 match event {
68 gateway_event::Event::Ping(ping) => {
69 let origin = ping.origin?;
70 self.store.on_ping(
71 now,
72 from,
73 PingEvent {
74 cpu: ping.cpu as u8,
75 memory: ping.memory as u8,
76 disk: ping.disk as u8,
77 origin,
78 webrtc: ping.webrtc,
79 rtpengine: ping.rtpengine,
80 },
81 )
82 }
83 }
84
85 Some(())
86 }
87}
88
89impl<UserData: Copy + Eq + Debug, SC, SE, TC, TW> Service<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW> for GatewayStoreService<UserData, SC, SE, TC, TW>
90where
91 SC: From<Control> + TryInto<Control> + Debug,
92 SE: From<Event> + TryInto<Event>,
93 TC: Debug,
94{
95 fn service_id(&self) -> u8 {
96 STORE_SERVICE_ID
97 }
98
99 fn service_name(&self) -> &str {
100 STORE_SERVICE_NAME
101 }
102
103 fn is_service_empty(&self) -> bool {
104 self.shutdown && self.queue.is_empty()
105 }
106
107 fn on_shared_input<'a>(&mut self, _ctx: &ServiceCtx, now: u64, input: ServiceSharedInput) {
108 match input {
109 ServiceSharedInput::Tick(_) => {
110 self.store.on_tick(now);
111 if let Some(ping) = self.store.pop_output() {
112 let rule = RouteRule::ToServices(STORE_SERVICE_ID, ServiceBroadcastLevel::Global, self.seq);
113 self.seq += 1;
114 let mut meta = NetOutgoingMeta::secure();
115 meta.source = true;
116 let data = protobuf::cluster_gateway::GatewayEvent {
117 event: Some(gateway_event::Event::Ping(protobuf::cluster_gateway::PingEvent {
118 cpu: ping.cpu as u32,
119 memory: ping.memory as u32,
120 disk: ping.disk as u32,
121 webrtc: ping.webrtc,
122 rtpengine: ping.rtpengine,
123 origin: Some(ping.origin),
124 })),
125 }
126 .encode_to_vec();
127 self.queue.push_back(ServiceOutput::FeatureControl(data::Control::DataSendRule(DATA_PORT, rule, meta, data).into()));
128 }
129 }
130 ServiceSharedInput::Connection(_) => {}
131 }
132 }
133
134 fn on_input(&mut self, _ctx: &ServiceCtx, now: u64, input: ServiceInput<UserData, FeaturesEvent, SC, TC>) {
135 match input {
136 ServiceInput::FeatureEvent(FeaturesEvent::Data(data::Event::Recv(port, meta, data))) => {
137 self.handle_event(now, port, meta, &data);
138 }
139 ServiceInput::Control(actor, control) => {
140 if let Ok(control) = control.try_into() {
141 match control {
142 Control::FindNodeReq(req_id, kind, location) => {
143 let out = self.store.best_for(kind, location);
144 self.queue.push_back(ServiceOutput::Event(actor, Event::FindNodeRes(req_id, out).into()));
145 }
146 Control::FindDestReq(req_id, kind, dest) => {
147 let out = self.store.dest_for(kind, dest);
148 self.queue.push_back(ServiceOutput::Event(actor, Event::FindDestRes(req_id, out).into()));
149 }
150 Control::NodeStats(metrics) => {
151 log::debug!("[GatewayStoreService] node metrics {:?}", metrics);
152 self.store.on_node_metrics(now, metrics);
153 }
154 Control::GetMediaStats => {
155 if let Some(stats) = self.store.local_stats() {
156 self.queue.push_back(ServiceOutput::Event(actor, Event::MediaStats(stats.live, stats.max).into()));
157 }
158 }
159 }
160 }
161 }
162 _ => {}
163 }
164 }
165
166 fn on_shutdown(&mut self, _ctx: &ServiceCtx, _now: u64) {
167 self.shutdown = true;
168 }
169
170 fn pop_output2(&mut self, _now: u64) -> Option<ServiceOutput<UserData, FeaturesControl, SE, TW>> {
171 self.queue.pop_front()
172 }
173}
174
175pub struct GatewayStoreServiceWorker<UserData, SC, SE, TC> {
176 queue: VecDeque<ServiceWorkerOutput<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC>>,
177 shutdown: bool,
178}
179
180impl<UserData, SC, SE, TC, TW> ServiceWorker<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW> for GatewayStoreServiceWorker<UserData, SC, SE, TC> {
181 fn service_id(&self) -> u8 {
182 STORE_SERVICE_ID
183 }
184
185 fn service_name(&self) -> &str {
186 STORE_SERVICE_NAME
187 }
188
189 fn is_service_empty(&self) -> bool {
190 self.shutdown && self.queue.is_empty()
191 }
192
193 fn on_tick(&mut self, _ctx: &ServiceWorkerCtx, _now: u64, _tick_count: u64) {}
194
195 fn on_input(&mut self, _ctx: &ServiceWorkerCtx, _now: u64, input: ServiceWorkerInput<UserData, FeaturesEvent, SC, TW>) {
196 match input {
197 ServiceWorkerInput::Control(owner, control) => self.queue.push_back(ServiceWorkerOutput::ForwardControlToController(owner, control)),
198 ServiceWorkerInput::FromController(_) => {}
199 ServiceWorkerInput::FeatureEvent(event) => {
200 log::info!("forward event to controller");
201 self.queue.push_back(ServiceWorkerOutput::ForwardFeatureEventToController(event))
202 }
203 }
204 }
205
206 fn on_shutdown(&mut self, _ctx: &ServiceWorkerCtx, _now: u64) {
207 self.shutdown = true;
208 }
209
210 fn pop_output2(&mut self, _now: u64) -> Option<ServiceWorkerOutput<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC>> {
211 self.queue.pop_front()
212 }
213}
214
215pub struct GatewayStoreServiceBuilder<UserData, SC, SE, TC, TW> {
216 _tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>,
217 zone: ZoneId,
218 lat: f32,
219 lon: f32,
220 max_memory: u8,
221 max_disk: u8,
222 max_cpu: u8,
223}
224
225impl<UserData, SC, SE, TC, TW> GatewayStoreServiceBuilder<UserData, SC, SE, TC, TW> {
226 pub fn new(zone: ZoneId, lat: f32, lon: f32, max_cpu: u8, max_memory: u8, max_disk: u8) -> Self {
227 Self {
228 zone,
229 lat,
230 lon,
231 _tmp: std::marker::PhantomData,
232 max_cpu,
233 max_memory,
234 max_disk,
235 }
236 }
237}
238
239impl<UserData, SC, SE, TC, TW> ServiceBuilder<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW> for GatewayStoreServiceBuilder<UserData, SC, SE, TC, TW>
240where
241 UserData: 'static + Debug + Send + Sync + Copy + Eq,
242 SC: 'static + Debug + Send + Sync + From<Control> + TryInto<Control>,
243 SE: 'static + Debug + Send + Sync + From<Event> + TryInto<Event>,
244 TC: 'static + Debug + Send + Sync,
245 TW: 'static + Debug + Send + Sync,
246{
247 fn service_id(&self) -> u8 {
248 STORE_SERVICE_ID
249 }
250
251 fn service_name(&self) -> &str {
252 STORE_SERVICE_NAME
253 }
254
255 fn discoverable(&self) -> bool {
256 true
257 }
258
259 fn create(&self) -> Box<dyn Service<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW>> {
260 Box::new(GatewayStoreService::new(self.zone, self.lat, self.lon, self.max_cpu, self.max_memory, self.max_disk))
261 }
262
263 fn create_worker(&self) -> Box<dyn ServiceWorker<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW>> {
264 Box::new(GatewayStoreServiceWorker {
265 queue: Default::default(),
266 shutdown: false,
267 })
268 }
269}