1use std::net::{IpAddr, Ipv4Addr, SocketAddr};
64use std::sync::mpsc::Receiver;
65
66use ustreamer_input::{AppAction, InputMapper};
67use ustreamer_proto::input::InputEvent;
68use ustreamer_quality::QualityController;
69
70pub const DEFAULT_STREAM_PORT: u16 = 8080;
72
73pub const DEFAULT_HTTP_PORT: u16 = 8090;
75
76#[derive(Debug, Clone, Copy)]
78pub struct StreamFrameSource<'a> {
79 pub instance: &'a wgpu::Instance,
80 pub device: &'a wgpu::Device,
81 pub queue: &'a wgpu::Queue,
82 pub texture: &'a wgpu::Texture,
83}
84
85pub trait StreamFrameProvider {
87 fn stream_frame_source(&self) -> StreamFrameSource<'_>;
88}
89
90pub trait AppActionSink {
92 fn apply_app_action(&mut self, action: AppAction) -> Option<String>;
93}
94
95pub trait MappedInputApp: AppActionSink {
97 fn input_mapper(&mut self) -> &mut InputMapper;
98
99 fn handle_input_event(&mut self, _event: &InputEvent) -> Option<String> {
100 None
101 }
102}
103
104pub trait RawInputApp {
106 fn handle_input_event(&mut self, event: InputEvent) -> Option<String>;
107}
108
109pub trait SessionLifecycle {
111 fn on_stream_ready(&mut self) {}
112
113 fn on_viewer_connected(&mut self, _session_id: u64) {}
114
115 fn on_viewer_disconnected(&mut self, _session_id: u64) {}
116}
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120pub struct LocalStreamEndpoints {
121 pub stream: SocketAddr,
122 pub http: SocketAddr,
123}
124
125impl LocalStreamEndpoints {
126 pub fn loopback(stream_port: u16, http_port: u16) -> Self {
127 let loopback = IpAddr::V4(Ipv4Addr::LOCALHOST);
128 Self {
129 stream: SocketAddr::new(loopback, stream_port),
130 http: SocketAddr::new(loopback, http_port),
131 }
132 }
133}
134
135impl Default for LocalStreamEndpoints {
136 fn default() -> Self {
137 Self::loopback(DEFAULT_STREAM_PORT, DEFAULT_HTTP_PORT)
138 }
139}
140
141pub fn drain_mapped_input_events<T: MappedInputApp>(
144 input_rx: &Receiver<InputEvent>,
145 quality: &mut QualityController,
146 app: &mut T,
147) -> Option<String> {
148 let mut last_status = None;
149 while let Ok(event) = input_rx.try_recv() {
150 quality.on_input();
151 if let Some(status) = app.handle_input_event(&event) {
152 last_status = Some(status);
153 }
154 let actions = {
155 let mapper = app.input_mapper();
156 mapper.process(&event)
157 };
158 for action in actions {
159 if let Some(status) = app.apply_app_action(action) {
160 last_status = Some(status);
161 }
162 }
163 }
164 last_status
165}
166
167pub fn drain_raw_input_events<T: RawInputApp>(
169 input_rx: &Receiver<InputEvent>,
170 quality: &mut QualityController,
171 app: &mut T,
172) -> Option<String> {
173 let mut last_status = None;
174 while let Ok(event) = input_rx.try_recv() {
175 quality.on_input();
176 if let Some(status) = app.handle_input_event(event) {
177 last_status = Some(status);
178 }
179 }
180 last_status
181}
182
183#[cfg(test)]
184mod tests {
185 use std::net::IpAddr;
186 use std::sync::mpsc;
187
188 use ustreamer_input::{AppAction, InputMapper};
189 use ustreamer_proto::input::{InputEvent, ScrollMode};
190
191 use super::{
192 AppActionSink, DEFAULT_HTTP_PORT, DEFAULT_STREAM_PORT, LocalStreamEndpoints,
193 MappedInputApp, RawInputApp, drain_mapped_input_events, drain_raw_input_events,
194 };
195
196 #[derive(Default)]
197 struct TestMappedApp {
198 mapper: InputMapper,
199 seen_actions: Vec<AppAction>,
200 raw_events: Vec<InputEvent>,
201 }
202
203 impl AppActionSink for TestMappedApp {
204 fn apply_app_action(&mut self, action: AppAction) -> Option<String> {
205 self.seen_actions.push(action.clone());
206 match action {
207 AppAction::ScrollStep { delta } => Some(format!("scroll:{delta}")),
208 AppAction::PointerUpdate { .. } => Some("pointer".into()),
209 _ => None,
210 }
211 }
212 }
213
214 impl MappedInputApp for TestMappedApp {
215 fn input_mapper(&mut self) -> &mut InputMapper {
216 &mut self.mapper
217 }
218
219 fn handle_input_event(&mut self, event: &InputEvent) -> Option<String> {
220 self.raw_events.push(*event);
221 None
222 }
223 }
224
225 #[derive(Default)]
226 struct TestRawApp {
227 seen_events: Vec<InputEvent>,
228 }
229
230 impl RawInputApp for TestRawApp {
231 fn handle_input_event(&mut self, event: InputEvent) -> Option<String> {
232 self.seen_events.push(event);
233 Some(format!("events:{}", self.seen_events.len()))
234 }
235 }
236
237 #[test]
238 fn drain_mapped_input_events_processes_raw_events_and_actions() {
239 let (tx, rx) = mpsc::channel();
240 tx.send(InputEvent::PointerMove {
241 x: 0.25,
242 y: 0.5,
243 buttons: 1,
244 timestamp_ms: 1,
245 })
246 .unwrap();
247 tx.send(InputEvent::Scroll {
248 delta_x: 0.0,
249 delta_y: 12.0,
250 mode: ScrollMode::Pixels,
251 })
252 .unwrap();
253 drop(tx);
254
255 let mut quality = ustreamer_quality::QualityController::new(Default::default());
256 let mut app = TestMappedApp::default();
257 let status = drain_mapped_input_events(&rx, &mut quality, &mut app);
258
259 assert_eq!(status.as_deref(), Some("scroll:1"));
260 assert_eq!(app.raw_events.len(), 2);
261 assert!(
262 app.seen_actions
263 .iter()
264 .any(|action| matches!(action, AppAction::PointerUpdate { .. }))
265 );
266 assert!(
267 app.seen_actions
268 .iter()
269 .any(|action| matches!(action, AppAction::Rotate { .. }))
270 );
271 assert!(
272 app.seen_actions
273 .iter()
274 .any(|action| matches!(action, AppAction::ScrollStep { delta: 1 }))
275 );
276 }
277
278 #[test]
279 fn drain_raw_input_events_reports_last_status() {
280 let (tx, rx) = mpsc::channel();
281 tx.send(InputEvent::KeyDown { code: b'R' as u16 }).unwrap();
282 tx.send(InputEvent::KeyUp { code: b'R' as u16 }).unwrap();
283 drop(tx);
284
285 let mut quality = ustreamer_quality::QualityController::new(Default::default());
286 let mut app = TestRawApp::default();
287 let status = drain_raw_input_events(&rx, &mut quality, &mut app);
288
289 assert_eq!(status.as_deref(), Some("events:2"));
290 assert_eq!(app.seen_events.len(), 2);
291 }
292
293 #[test]
294 fn default_local_stream_endpoints_use_loopback_ports() {
295 let endpoints = LocalStreamEndpoints::default();
296 assert_eq!(endpoints.stream.port(), DEFAULT_STREAM_PORT);
297 assert_eq!(endpoints.http.port(), DEFAULT_HTTP_PORT);
298 assert!(matches!(endpoints.stream.ip(), IpAddr::V4(addr) if addr.is_loopback()));
299 assert!(matches!(endpoints.http.ip(), IpAddr::V4(addr) if addr.is_loopback()));
300 }
301}