1use std::{
2 collections::HashMap,
3 fmt, mem,
4 path::PathBuf,
5 sync::Arc,
6 task::Waker,
7 time::{Duration, Instant},
8};
9
10use crate::Deadline;
11use parking_lot::Mutex;
12use zng_app_context::{AppScope, app_local};
13use zng_task::DEADLINE_APP;
14use zng_time::{INSTANT_APP, InstantMode};
15use zng_txt::Txt;
16use zng_var::{ResponderVar, ResponseVar, VARS_APP, Var, response_var};
17use zng_view_api::{DeviceEventsFilter, raw_input::InputDeviceEvent};
18
19use crate::{
20 APP, AppControlFlow, AppEventObserver, AppExtension, AppExtensionsInfo, DInstant, INSTANT,
21 event::{AnyEventArgs, CommandHandle, CommandInfoExt, CommandNameExt, EVENTS, EventPropagationHandle, command, event},
22 event_args,
23 shortcut::CommandShortcutExt,
24 shortcut::shortcut,
25 timer::TimersService,
26 update::{
27 ContextUpdates, EventUpdate, InfoUpdates, LayoutUpdates, RenderUpdates, UPDATES, UpdateOp, UpdateTrace, UpdatesTrace, WidgetUpdates,
28 },
29 view_process::{raw_device_events::InputDeviceId, *},
30 widget::WidgetId,
31 window::WindowId,
32};
33
34pub(crate) struct RunningApp<E: AppExtension> {
36 extensions: (AppIntrinsic, E),
37
38 receiver: flume::Receiver<AppEvent>,
39
40 loop_timer: LoopTimer,
41 loop_monitor: LoopMonitor,
42 last_wait_event: Instant,
43
44 pending_view_events: Vec<zng_view_api::Event>,
45 pending_view_frame_events: Vec<zng_view_api::window::EventFrameRendered>,
46 pending: ContextUpdates,
47
48 exited: bool,
49
50 _scope: AppScope,
52}
53impl<E: AppExtension> RunningApp<E> {
54 pub(crate) fn start(
55 scope: AppScope,
56 mut extensions: E,
57 is_headed: bool,
58 with_renderer: bool,
59 view_process_exe: Option<PathBuf>,
60 view_process_env: HashMap<Txt, Txt>,
61 ) -> Self {
62 let _s = tracing::debug_span!("APP::start").entered();
63
64 let (sender, receiver) = AppEventSender::new();
65
66 UPDATES.init(sender);
67
68 fn app_waker() {
69 UPDATES.update(None);
70 }
71 VARS_APP.init_app_waker(app_waker);
72 VARS_APP.init_modify_trace(UpdatesTrace::log_var);
73 DEADLINE_APP.init_deadline_service(crate::timer::deadline_service);
74 zng_var::animation::TRANSITIONABLE_APP.init_rgba_lerp(zng_color::lerp_rgba);
75
76 let mut info = AppExtensionsInfo::start();
77 {
78 let _t = INSTANT_APP.pause_for_update();
79 extensions.register(&mut info);
80 }
81
82 {
83 let mut sv = APP_PROCESS_SV.write();
84 sv.set_extensions(info);
85 }
86
87 if with_renderer && view_process_exe.is_none() {
88 zng_env::assert_inited();
89 }
90
91 #[cfg(not(target_arch = "wasm32"))]
92 let view_process_exe = view_process_exe.unwrap_or_else(|| std::env::current_exe().expect("current_exe"));
93 #[cfg(target_arch = "wasm32")]
94 let view_process_exe = std::path::PathBuf::from("<wasm>");
95
96 let process = AppIntrinsic::pre_init(is_headed, with_renderer, view_process_exe, view_process_env);
97
98 {
99 let _s = tracing::debug_span!("extensions.init").entered();
100 extensions.init();
101 }
102
103 let args = AppStartArgs { _private: () };
104 for h in zng_unique_id::hot_static_ref!(ON_APP_START).lock().iter_mut() {
105 h(&args)
106 }
107
108 RunningApp {
109 extensions: (process, extensions),
110
111 receiver,
112
113 loop_timer: LoopTimer::default(),
114 loop_monitor: LoopMonitor::default(),
115 last_wait_event: Instant::now(),
116
117 pending_view_events: Vec::with_capacity(100),
118 pending_view_frame_events: Vec::with_capacity(5),
119 pending: ContextUpdates {
120 events: Vec::with_capacity(100),
121 update: false,
122 info: false,
123 layout: false,
124 render: false,
125 update_widgets: WidgetUpdates::default(),
126 info_widgets: InfoUpdates::default(),
127 layout_widgets: LayoutUpdates::default(),
128 render_widgets: RenderUpdates::default(),
129 render_update_widgets: RenderUpdates::default(),
130 },
131 exited: false,
132
133 _scope: scope,
134 }
135 }
136
137 pub fn has_exited(&self) -> bool {
138 self.exited
139 }
140
141 pub fn notify_event<O: AppEventObserver>(&mut self, mut update: EventUpdate, observer: &mut O) {
143 let _scope = tracing::trace_span!("notify_event", event = update.event().name()).entered();
144
145 let _t = INSTANT_APP.pause_for_update();
146
147 update.event().on_update(&mut update);
148
149 self.extensions.event_preview(&mut update);
150 observer.event_preview(&mut update);
151 update.call_pre_actions();
152
153 self.extensions.event_ui(&mut update);
154 observer.event_ui(&mut update);
155
156 self.extensions.event(&mut update);
157 observer.event(&mut update);
158 update.call_pos_actions();
159 }
160
161 fn input_device_id(&mut self, id: zng_view_api::raw_input::InputDeviceId) -> InputDeviceId {
162 VIEW_PROCESS.input_device_id(id)
163 }
164
165 fn on_view_event<O: AppEventObserver>(&mut self, ev: zng_view_api::Event, observer: &mut O) {
167 use crate::view_process::raw_device_events::*;
168 use crate::view_process::raw_events::*;
169 use zng_view_api::Event;
170
171 fn window_id(id: zng_view_api::window::WindowId) -> WindowId {
172 WindowId::from_raw(id.get())
173 }
174
175 match ev {
176 Event::MouseMoved {
177 window: w_id,
178 device: d_id,
179 coalesced_pos,
180 position,
181 } => {
182 let args = RawMouseMovedArgs::now(window_id(w_id), self.input_device_id(d_id), coalesced_pos, position);
183 self.notify_event(RAW_MOUSE_MOVED_EVENT.new_update(args), observer);
184 }
185 Event::MouseEntered {
186 window: w_id,
187 device: d_id,
188 } => {
189 let args = RawMouseArgs::now(window_id(w_id), self.input_device_id(d_id));
190 self.notify_event(RAW_MOUSE_ENTERED_EVENT.new_update(args), observer);
191 }
192 Event::MouseLeft {
193 window: w_id,
194 device: d_id,
195 } => {
196 let args = RawMouseArgs::now(window_id(w_id), self.input_device_id(d_id));
197 self.notify_event(RAW_MOUSE_LEFT_EVENT.new_update(args), observer);
198 }
199 Event::WindowChanged(c) => {
200 let monitor_id = c.monitor.map(|id| VIEW_PROCESS.monitor_id(id));
201 let args = RawWindowChangedArgs::now(
202 window_id(c.window),
203 c.state,
204 c.position,
205 monitor_id,
206 c.size,
207 c.safe_padding,
208 c.cause,
209 c.frame_wait_id,
210 );
211 self.notify_event(RAW_WINDOW_CHANGED_EVENT.new_update(args), observer);
212 }
213 Event::DragHovered { window, data, allowed } => {
214 let args = RawDragHoveredArgs::now(window_id(window), data, allowed);
215 self.notify_event(RAW_DRAG_HOVERED_EVENT.new_update(args), observer);
216 }
217 Event::DragMoved {
218 window,
219 coalesced_pos,
220 position,
221 } => {
222 let args = RawDragMovedArgs::now(window_id(window), coalesced_pos, position);
223 self.notify_event(RAW_DRAG_MOVED_EVENT.new_update(args), observer);
224 }
225 Event::DragDropped {
226 window,
227 data,
228 allowed,
229 drop_id,
230 } => {
231 let args = RawDragDroppedArgs::now(window_id(window), data, allowed, drop_id);
232 self.notify_event(RAW_DRAG_DROPPED_EVENT.new_update(args), observer);
233 }
234 Event::DragCancelled { window } => {
235 let args = RawDragCancelledArgs::now(window_id(window));
236 self.notify_event(RAW_DRAG_CANCELLED_EVENT.new_update(args), observer);
237 }
238 Event::AppDragEnded { window, drag, applied } => {
239 let args = RawAppDragEndedArgs::now(window_id(window), drag, applied);
240 self.notify_event(RAW_APP_DRAG_ENDED_EVENT.new_update(args), observer);
241 }
242 Event::FocusChanged { prev, new } => {
243 let args = RawWindowFocusArgs::now(prev.map(window_id), new.map(window_id));
244 self.notify_event(RAW_WINDOW_FOCUS_EVENT.new_update(args), observer);
245 }
246 Event::KeyboardInput {
247 window: w_id,
248 device: d_id,
249 key_code,
250 state,
251 key,
252 key_location,
253 key_modified,
254 text,
255 } => {
256 let args = RawKeyInputArgs::now(
257 window_id(w_id),
258 self.input_device_id(d_id),
259 key_code,
260 key_location,
261 state,
262 key,
263 key_modified,
264 text,
265 );
266 self.notify_event(RAW_KEY_INPUT_EVENT.new_update(args), observer);
267 }
268 Event::Ime { window: w_id, ime } => {
269 let args = RawImeArgs::now(window_id(w_id), ime);
270 self.notify_event(RAW_IME_EVENT.new_update(args), observer);
271 }
272
273 Event::MouseWheel {
274 window: w_id,
275 device: d_id,
276 delta,
277 phase,
278 } => {
279 let args = RawMouseWheelArgs::now(window_id(w_id), self.input_device_id(d_id), delta, phase);
280 self.notify_event(RAW_MOUSE_WHEEL_EVENT.new_update(args), observer);
281 }
282 Event::MouseInput {
283 window: w_id,
284 device: d_id,
285 state,
286 button,
287 } => {
288 let args = RawMouseInputArgs::now(window_id(w_id), self.input_device_id(d_id), state, button);
289 self.notify_event(RAW_MOUSE_INPUT_EVENT.new_update(args), observer);
290 }
291 Event::TouchpadPressure {
292 window: w_id,
293 device: d_id,
294 pressure,
295 stage,
296 } => {
297 let args = RawTouchpadPressureArgs::now(window_id(w_id), self.input_device_id(d_id), pressure, stage);
298 self.notify_event(RAW_TOUCHPAD_PRESSURE_EVENT.new_update(args), observer);
299 }
300 Event::AxisMotion {
301 window: w_id,
302 device: d_id,
303 axis,
304 value,
305 } => {
306 let args = RawAxisMotionArgs::now(window_id(w_id), self.input_device_id(d_id), axis, value);
307 self.notify_event(RAW_AXIS_MOTION_EVENT.new_update(args), observer);
308 }
309 Event::Touch {
310 window: w_id,
311 device: d_id,
312 touches,
313 } => {
314 let args = RawTouchArgs::now(window_id(w_id), self.input_device_id(d_id), touches);
315 self.notify_event(RAW_TOUCH_EVENT.new_update(args), observer);
316 }
317 Event::ScaleFactorChanged {
318 monitor: id,
319 windows,
320 scale_factor,
321 } => {
322 let monitor_id = VIEW_PROCESS.monitor_id(id);
323 let windows: Vec<_> = windows.into_iter().map(window_id).collect();
324 let args = RawScaleFactorChangedArgs::now(monitor_id, windows, scale_factor);
325 self.notify_event(RAW_SCALE_FACTOR_CHANGED_EVENT.new_update(args), observer);
326 }
327 Event::MonitorsChanged(monitors) => {
328 let monitors: Vec<_> = monitors.into_iter().map(|(id, info)| (VIEW_PROCESS.monitor_id(id), info)).collect();
329 let args = RawMonitorsChangedArgs::now(monitors);
330 self.notify_event(RAW_MONITORS_CHANGED_EVENT.new_update(args), observer);
331 }
332 Event::AudioDevicesChanged(_audio_devices) => {
333 }
335 Event::WindowCloseRequested(w_id) => {
336 let args = RawWindowCloseRequestedArgs::now(window_id(w_id));
337 self.notify_event(RAW_WINDOW_CLOSE_REQUESTED_EVENT.new_update(args), observer);
338 }
339 Event::WindowOpened(w_id, data) => {
340 let w_id = window_id(w_id);
341 let (window, data) = VIEW_PROCESS.on_window_opened(w_id, data);
342 let args = RawWindowOpenArgs::now(w_id, window, data);
343 self.notify_event(RAW_WINDOW_OPEN_EVENT.new_update(args), observer);
344 }
345 Event::HeadlessOpened(w_id, data) => {
346 let w_id = window_id(w_id);
347 let (surface, data) = VIEW_PROCESS.on_headless_opened(w_id, data);
348 let args = RawHeadlessOpenArgs::now(w_id, surface, data);
349 self.notify_event(RAW_HEADLESS_OPEN_EVENT.new_update(args), observer);
350 }
351 Event::WindowOrHeadlessOpenError { id: w_id, error } => {
352 let w_id = window_id(w_id);
353 let args = RawWindowOrHeadlessOpenErrorArgs::now(w_id, error);
354 self.notify_event(RAW_WINDOW_OR_HEADLESS_OPEN_ERROR_EVENT.new_update(args), observer);
355 }
356 Event::WindowClosed(w_id) => {
357 let args = RawWindowCloseArgs::now(window_id(w_id));
358 self.notify_event(RAW_WINDOW_CLOSE_EVENT.new_update(args), observer);
359 }
360 Event::ImageMetadataLoaded {
361 image: id,
362 size,
363 ppi,
364 is_mask,
365 } => {
366 if let Some(img) = VIEW_PROCESS.on_image_metadata_loaded(id, size, ppi, is_mask) {
367 let args = RawImageArgs::now(img);
368 self.notify_event(RAW_IMAGE_METADATA_LOADED_EVENT.new_update(args), observer);
369 }
370 }
371 Event::ImagePartiallyLoaded {
372 image: id,
373 partial_size,
374 ppi,
375 is_opaque,
376 is_mask,
377 partial_pixels: partial_bgra8,
378 } => {
379 if let Some(img) = VIEW_PROCESS.on_image_partially_loaded(id, partial_size, ppi, is_opaque, is_mask, partial_bgra8) {
380 let args = RawImageArgs::now(img);
381 self.notify_event(RAW_IMAGE_PARTIALLY_LOADED_EVENT.new_update(args), observer);
382 }
383 }
384 Event::ImageLoaded(image) => {
385 if let Some(img) = VIEW_PROCESS.on_image_loaded(image) {
386 let args = RawImageArgs::now(img);
387 self.notify_event(RAW_IMAGE_LOADED_EVENT.new_update(args), observer);
388 }
389 }
390 Event::ImageLoadError { image: id, error } => {
391 if let Some(img) = VIEW_PROCESS.on_image_error(id, error) {
392 let args = RawImageArgs::now(img);
393 self.notify_event(RAW_IMAGE_LOAD_ERROR_EVENT.new_update(args), observer);
394 }
395 }
396 Event::ImageEncoded { image: id, format, data } => VIEW_PROCESS.on_image_encoded(id, format, data),
397 Event::ImageEncodeError { image: id, format, error } => {
398 VIEW_PROCESS.on_image_encode_error(id, format, error);
399 }
400 Event::FrameImageReady {
401 window: w_id,
402 frame: frame_id,
403 image: image_id,
404 selection,
405 } => {
406 if let Some(img) = VIEW_PROCESS.on_frame_image_ready(image_id) {
407 let args = RawFrameImageReadyArgs::now(img, window_id(w_id), frame_id, selection);
408 self.notify_event(RAW_FRAME_IMAGE_READY_EVENT.new_update(args), observer);
409 }
410 }
411
412 Event::AccessInit { window: w_id } => {
413 self.notify_event(crate::access::on_access_init(window_id(w_id)), observer);
414 }
415 Event::AccessCommand {
416 window: win_id,
417 target: wgt_id,
418 command,
419 } => {
420 if let Some(update) = crate::access::on_access_command(window_id(win_id), WidgetId::from_raw(wgt_id.0), command) {
421 self.notify_event(update, observer);
422 }
423 }
424 Event::AccessDeinit { window: w_id } => {
425 self.notify_event(crate::access::on_access_deinit(window_id(w_id)), observer);
426 }
427
428 Event::MsgDialogResponse(id, response) => {
430 VIEW_PROCESS.on_message_dlg_response(id, response);
431 }
432 Event::FileDialogResponse(id, response) => {
433 VIEW_PROCESS.on_file_dlg_response(id, response);
434 }
435
436 Event::ExtensionEvent(id, payload) => {
438 let args = RawExtensionEventArgs::now(id, payload);
439 self.notify_event(RAW_EXTENSION_EVENT.new_update(args), observer);
440 }
441
442 Event::FontsChanged => {
444 let args = RawFontChangedArgs::now();
445 self.notify_event(RAW_FONT_CHANGED_EVENT.new_update(args), observer);
446 }
447 Event::FontAaChanged(aa) => {
448 let args = RawFontAaChangedArgs::now(aa);
449 self.notify_event(RAW_FONT_AA_CHANGED_EVENT.new_update(args), observer);
450 }
451 Event::MultiClickConfigChanged(cfg) => {
452 let args = RawMultiClickConfigChangedArgs::now(cfg);
453 self.notify_event(RAW_MULTI_CLICK_CONFIG_CHANGED_EVENT.new_update(args), observer);
454 }
455 Event::AnimationsConfigChanged(cfg) => {
456 VARS_APP.set_sys_animations_enabled(cfg.enabled);
457 let args = RawAnimationsConfigChangedArgs::now(cfg);
458 self.notify_event(RAW_ANIMATIONS_CONFIG_CHANGED_EVENT.new_update(args), observer);
459 }
460 Event::KeyRepeatConfigChanged(cfg) => {
461 let args = RawKeyRepeatConfigChangedArgs::now(cfg);
462 self.notify_event(RAW_KEY_REPEAT_CONFIG_CHANGED_EVENT.new_update(args), observer);
463 }
464 Event::TouchConfigChanged(cfg) => {
465 let args = RawTouchConfigChangedArgs::now(cfg);
466 self.notify_event(RAW_TOUCH_CONFIG_CHANGED_EVENT.new_update(args), observer);
467 }
468 Event::LocaleChanged(cfg) => {
469 let args = RawLocaleChangedArgs::now(cfg);
470 self.notify_event(RAW_LOCALE_CONFIG_CHANGED_EVENT.new_update(args), observer);
471 }
472 Event::ColorsConfigChanged(cfg) => {
473 let args = RawColorsConfigChangedArgs::now(cfg);
474 self.notify_event(RAW_COLORS_CONFIG_CHANGED_EVENT.new_update(args), observer);
475 }
476 Event::ChromeConfigChanged(cfg) => {
477 let args = RawChromeConfigChangedArgs::now(cfg);
478 self.notify_event(RAW_CHROME_CONFIG_CHANGED_EVENT.new_update(args), observer);
479 }
480
481 Event::InputDevicesChanged(devices) => {
483 let devices: HashMap<_, _> = devices.into_iter().map(|(d_id, info)| (self.input_device_id(d_id), info)).collect();
484 INPUT_DEVICES.update(devices.clone());
485 let args = InputDevicesChangedArgs::now(devices);
486 self.notify_event(INPUT_DEVICES_CHANGED_EVENT.new_update(args), observer);
487 }
488 Event::InputDeviceEvent { device, event } => {
489 let d_id = self.input_device_id(device);
490 match event {
491 InputDeviceEvent::PointerMotion { delta } => {
492 let args = PointerMotionArgs::now(d_id, delta);
493 self.notify_event(POINTER_MOTION_EVENT.new_update(args), observer);
494 }
495 InputDeviceEvent::ScrollMotion { delta } => {
496 let args = ScrollMotionArgs::now(d_id, delta);
497 self.notify_event(SCROLL_MOTION_EVENT.new_update(args), observer);
498 }
499 InputDeviceEvent::AxisMotion { axis, value } => {
500 let args = AxisMotionArgs::now(d_id, axis, value);
501 self.notify_event(AXIS_MOTION_EVENT.new_update(args), observer);
502 }
503 InputDeviceEvent::Button { button, state } => {
504 let args = ButtonArgs::now(d_id, button, state);
505 self.notify_event(BUTTON_EVENT.new_update(args), observer);
506 }
507 InputDeviceEvent::Key { key_code, state } => {
508 let args = KeyArgs::now(d_id, key_code, state);
509 self.notify_event(KEY_EVENT.new_update(args), observer);
510 }
511 _ => {}
512 }
513 }
514
515 Event::LowMemory => {
516 LOW_MEMORY_EVENT.notify(LowMemoryArgs::now());
517 }
518
519 Event::RecoveredFromComponentPanic { component, recover, panic } => {
520 tracing::error!(
521 "view-process recovered from internal component panic\n component: {component}\n recover: {recover}\n```panic\n{panic}\n```"
522 );
523 }
524
525 Event::Inited(zng_view_api::Inited { .. }) | Event::Suspended | Event::Disconnected(_) | Event::FrameRendered(_) => {
527 unreachable!()
528 } _ => {}
531 }
532 }
533
534 fn on_view_rendered_event<O: AppEventObserver>(&mut self, ev: zng_view_api::window::EventFrameRendered, observer: &mut O) {
536 debug_assert!(ev.window != zng_view_api::window::WindowId::INVALID);
537 let window_id = WindowId::from_raw(ev.window.get());
538 let image = ev.frame_image.map(|img| VIEW_PROCESS.on_frame_image(img));
540 let args = crate::view_process::raw_events::RawFrameRenderedArgs::now(window_id, ev.frame, image);
541 self.notify_event(crate::view_process::raw_events::RAW_FRAME_RENDERED_EVENT.new_update(args), observer);
542 }
543
544 pub(crate) fn run_headed(mut self) {
545 let mut observer = ();
546 #[cfg(feature = "dyn_app_extension")]
547 let mut observer = observer.as_dyn();
548
549 self.apply_updates(&mut observer);
550 self.apply_update_events(&mut observer);
551 let mut wait = false;
552 loop {
553 wait = match self.poll_impl(wait, &mut observer) {
554 AppControlFlow::Poll => false,
555 AppControlFlow::Wait => true,
556 AppControlFlow::Exit => break,
557 };
558 }
559 }
560
561 fn push_coalesce<O: AppEventObserver>(&mut self, ev: AppEvent, observer: &mut O) {
562 match ev {
563 AppEvent::ViewEvent(ev) => match ev {
564 zng_view_api::Event::FrameRendered(ev) => {
565 if ev.window == zng_view_api::window::WindowId::INVALID {
566 tracing::error!("ignored rendered event for invalid window id, {ev:?}");
567 return;
568 }
569
570 let window = WindowId::from_raw(ev.window.get());
571
572 {
574 if VIEW_PROCESS.is_available() {
575 VIEW_PROCESS.on_frame_rendered(window);
576 }
577 }
578
579 #[cfg(debug_assertions)]
580 if self.pending_view_frame_events.iter().any(|e| e.window == ev.window) {
581 tracing::warn!("window `{window:?}` probably sent a frame request without awaiting renderer idle");
582 }
583
584 self.pending_view_frame_events.push(ev);
585 }
586 zng_view_api::Event::Pong(count) => VIEW_PROCESS.on_pong(count),
587 zng_view_api::Event::Inited(zng_view_api::Inited {
588 generation,
589 is_respawn,
590 extensions,
591 ..
592 }) => {
593 if is_respawn {
595 VIEW_PROCESS.on_respawned(generation);
596 APP_PROCESS_SV.read().is_suspended.set(false);
597 }
598
599 VIEW_PROCESS.handle_inited(generation, extensions.clone());
600
601 let args = crate::view_process::ViewProcessInitedArgs::now(generation, is_respawn, extensions);
602 self.notify_event(VIEW_PROCESS_INITED_EVENT.new_update(args), observer);
603 }
604 zng_view_api::Event::Suspended => {
605 VIEW_PROCESS.handle_suspended();
606 let args = crate::view_process::ViewProcessSuspendedArgs::now();
607 self.notify_event(VIEW_PROCESS_SUSPENDED_EVENT.new_update(args), observer);
608 APP_PROCESS_SV.read().is_suspended.set(true);
609 }
610 zng_view_api::Event::Disconnected(vp_gen) => {
611 VIEW_PROCESS.handle_disconnect(vp_gen);
613 }
614 ev => {
615 if let Some(last) = self.pending_view_events.last_mut() {
616 match last.coalesce(ev) {
617 Ok(()) => {}
618 Err(ev) => self.pending_view_events.push(ev),
619 }
620 } else {
621 self.pending_view_events.push(ev);
622 }
623 }
624 },
625 AppEvent::Event(ev) => EVENTS.notify(ev.get()),
626 AppEvent::Update(op, target) => {
627 UPDATES.update_op(op, target);
628 }
629 AppEvent::CheckUpdate => {}
630 AppEvent::ResumeUnwind(p) => std::panic::resume_unwind(p),
631 }
632 }
633
634 fn has_pending_updates(&mut self) -> bool {
635 !self.pending_view_events.is_empty() || self.pending.has_updates() || UPDATES.has_pending_updates() || !self.receiver.is_empty()
636 }
637
638 pub(crate) fn poll<O: AppEventObserver>(&mut self, wait_app_event: bool, observer: &mut O) -> AppControlFlow {
639 #[cfg(feature = "dyn_app_extension")]
640 let mut observer = observer.as_dyn();
641 #[cfg(feature = "dyn_app_extension")]
642 let observer = &mut observer;
643 self.poll_impl(wait_app_event, observer)
644 }
645 fn poll_impl<O: AppEventObserver>(&mut self, wait_app_event: bool, observer: &mut O) -> AppControlFlow {
646 let mut disconnected = false;
647
648 if self.exited {
649 return AppControlFlow::Exit;
650 }
651
652 if wait_app_event {
653 let idle = tracing::debug_span!("<idle>", ended_by = tracing::field::Empty).entered();
654
655 let timer = if self.view_is_busy() { None } else { self.loop_timer.poll() };
656 const PING_TIMER: Duration = Duration::from_secs(10);
657
658 match self.receiver.recv_deadline_sp(timer.unwrap_or(Deadline::timeout(PING_TIMER))) {
659 Ok(ev) => {
660 idle.record("ended_by", "event");
661 drop(idle);
662 self.last_wait_event = Instant::now();
663 self.push_coalesce(ev, observer)
664 }
665 Err(e) => match e {
666 flume::RecvTimeoutError::Timeout => {
667 if timer.is_none() {
668 idle.record("ended_by", "timeout (ping)");
669 } else {
670 idle.record("ended_by", "timeout");
671 }
672 if self.last_wait_event.elapsed() > PING_TIMER && !VIEW_PROCESS.is_same_process() && VIEW_PROCESS.is_connected() {
673 VIEW_PROCESS.ping();
674 }
675 }
676 flume::RecvTimeoutError::Disconnected => {
677 idle.record("ended_by", "disconnected");
678 disconnected = true
679 }
680 },
681 }
682 }
683 loop {
684 match self.receiver.try_recv() {
685 Ok(ev) => self.push_coalesce(ev, observer),
686 Err(e) => match e {
687 flume::TryRecvError::Empty => break,
688 flume::TryRecvError::Disconnected => {
689 disconnected = true;
690 break;
691 }
692 },
693 }
694 }
695 if disconnected {
696 panic!("app events channel disconnected");
697 }
698
699 if self.view_is_busy() {
700 return AppControlFlow::Wait;
701 }
702
703 UPDATES.on_app_awake();
704
705 let updated_timers = self.loop_timer.awake();
707 if updated_timers {
708 UPDATES.update_timers(&mut self.loop_timer);
710 self.apply_updates(observer);
711 }
712
713 let mut events = mem::take(&mut self.pending_view_events);
714 for ev in events.drain(..) {
715 self.on_view_event(ev, observer);
716 self.apply_updates(observer);
717 }
718 debug_assert!(self.pending_view_events.is_empty());
719 self.pending_view_events = events; let mut events = mem::take(&mut self.pending_view_frame_events);
722 for ev in events.drain(..) {
723 self.on_view_rendered_event(ev, observer);
724 }
725 self.pending_view_frame_events = events;
726
727 if self.has_pending_updates() {
728 self.apply_updates(observer);
729 self.apply_update_events(observer);
730 }
731
732 if self.view_is_busy() {
733 return AppControlFlow::Wait;
734 }
735
736 self.finish_frame(observer);
737
738 UPDATES.next_deadline(&mut self.loop_timer);
739
740 if self.extensions.0.exit() {
741 UPDATES.on_app_sleep();
742 self.exited = true;
743 AppControlFlow::Exit
744 } else if self.has_pending_updates() || UPDATES.has_pending_layout_or_render() {
745 AppControlFlow::Poll
746 } else {
747 UPDATES.on_app_sleep();
748 AppControlFlow::Wait
749 }
750 }
751
752 fn apply_updates<O: AppEventObserver>(&mut self, observer: &mut O) {
754 let _s = tracing::debug_span!("apply_updates").entered();
755
756 let mut run = true;
757 while run {
758 run = self.loop_monitor.update(|| {
759 let mut any = false;
760
761 self.pending |= UPDATES.apply_info();
762 if mem::take(&mut self.pending.info) {
763 any = true;
764 let _s = tracing::debug_span!("info").entered();
765
766 let mut info_widgets = mem::take(&mut self.pending.info_widgets);
767
768 let _t = INSTANT_APP.pause_for_update();
769
770 {
771 let _s = tracing::debug_span!("ext.info").entered();
772 self.extensions.info(&mut info_widgets);
773 }
774 {
775 let _s = tracing::debug_span!("obs.info").entered();
776 observer.info(&mut info_widgets);
777 }
778 }
779
780 self.pending |= UPDATES.apply_updates();
781 TimersService::notify();
782 if mem::take(&mut self.pending.update) {
783 any = true;
784 let _s = tracing::debug_span!("update").entered();
785
786 let mut update_widgets = mem::take(&mut self.pending.update_widgets);
787
788 let _t = INSTANT_APP.pause_for_update();
789
790 {
791 let _s = tracing::debug_span!("ext.update_preview").entered();
792 self.extensions.update_preview();
793 }
794 {
795 let _s = tracing::debug_span!("obs.update_preview").entered();
796 observer.update_preview();
797 }
798 UPDATES.on_pre_updates();
799
800 {
801 let _s = tracing::debug_span!("ext.update_ui").entered();
802 self.extensions.update_ui(&mut update_widgets);
803 }
804 {
805 let _s = tracing::debug_span!("obs.update_ui").entered();
806 observer.update_ui(&mut update_widgets);
807 }
808
809 {
810 let _s = tracing::debug_span!("ext.update").entered();
811 self.extensions.update();
812 }
813 {
814 let _s = tracing::debug_span!("obs.update").entered();
815 observer.update();
816 }
817 UPDATES.on_updates();
818 }
819
820 any
821 });
822 }
823 }
824
825 fn apply_update_events<O: AppEventObserver>(&mut self, observer: &mut O) {
827 let _s = tracing::debug_span!("apply_update_events").entered();
828
829 loop {
830 let events: Vec<_> = self.pending.events.drain(..).collect();
831 if events.is_empty() {
832 break;
833 }
834 for mut update in events {
835 let _s = tracing::debug_span!("update_event", ?update).entered();
836
837 self.loop_monitor.maybe_trace(|| {
838 let _t = INSTANT_APP.pause_for_update();
839
840 {
841 let _s = tracing::debug_span!("ext.event_preview").entered();
842 self.extensions.event_preview(&mut update);
843 }
844 {
845 let _s = tracing::debug_span!("obs.event_preview").entered();
846 observer.event_preview(&mut update);
847 }
848 update.call_pre_actions();
849
850 {
851 let _s = tracing::debug_span!("ext.event_ui").entered();
852 self.extensions.event_ui(&mut update);
853 }
854 {
855 let _s = tracing::debug_span!("obs.event_ui").entered();
856 observer.event_ui(&mut update);
857 }
858 {
859 let _s = tracing::debug_span!("ext.event").entered();
860 self.extensions.event(&mut update);
861 }
862 {
863 let _s = tracing::debug_span!("obs.event").entered();
864 observer.event(&mut update);
865 }
866 update.call_pos_actions();
867 });
868
869 self.apply_updates(observer);
870 }
871 }
872 }
873
874 fn view_is_busy(&mut self) -> bool {
875 VIEW_PROCESS.is_available() && VIEW_PROCESS.pending_frames() > 0
876 }
877
878 fn finish_frame<O: AppEventObserver>(&mut self, observer: &mut O) {
880 debug_assert!(!self.view_is_busy());
881
882 self.pending |= UPDATES.apply_layout_render();
883
884 while mem::take(&mut self.pending.layout) {
885 let _s = tracing::debug_span!("apply_layout").entered();
886
887 let mut layout_widgets = mem::take(&mut self.pending.layout_widgets);
888
889 self.loop_monitor.maybe_trace(|| {
890 let _t = INSTANT_APP.pause_for_update();
891
892 {
893 let _s = tracing::debug_span!("ext.layout").entered();
894 self.extensions.layout(&mut layout_widgets);
895 }
896 {
897 let _s = tracing::debug_span!("obs.layout").entered();
898 observer.layout(&mut layout_widgets);
899 }
900 });
901
902 self.apply_updates(observer);
903 self.apply_update_events(observer);
904 self.pending |= UPDATES.apply_layout_render();
905 }
906
907 if mem::take(&mut self.pending.render) {
908 let _s = tracing::debug_span!("apply_render").entered();
909
910 let mut render_widgets = mem::take(&mut self.pending.render_widgets);
911 let mut render_update_widgets = mem::take(&mut self.pending.render_update_widgets);
912
913 let _t = INSTANT_APP.pause_for_update();
914
915 {
916 let _s = tracing::debug_span!("ext.render").entered();
917 self.extensions.render(&mut render_widgets, &mut render_update_widgets);
918 }
919 {
920 let _s = tracing::debug_span!("obs.render").entered();
921 observer.render(&mut render_widgets, &mut render_update_widgets);
922 }
923 }
924
925 self.loop_monitor.finish_frame();
926 }
927}
928impl<E: AppExtension> Drop for RunningApp<E> {
929 fn drop(&mut self) {
930 let _s = tracing::debug_span!("ext.deinit").entered();
931 self.extensions.deinit();
932 VIEW_PROCESS.exit();
933 }
934}
935
936pub struct AppStartArgs {
941 _private: (),
942}
943
944pub fn on_app_start(handler: impl FnMut(&AppStartArgs) + Send + 'static) {
950 zng_unique_id::hot_static_ref!(ON_APP_START).lock().push(Box::new(handler))
951}
952zng_unique_id::hot_static! {
953 static ON_APP_START: Mutex<Vec<AppStartHandler>> = Mutex::new(vec![]);
954}
955type AppStartHandler = Box<dyn FnMut(&AppStartArgs) + Send + 'static>;
956
957#[derive(Debug)]
959pub(crate) struct LoopTimer {
960 now: DInstant,
961 deadline: Option<Deadline>,
962}
963impl Default for LoopTimer {
964 fn default() -> Self {
965 Self {
966 now: INSTANT.now(),
967 deadline: None,
968 }
969 }
970}
971impl LoopTimer {
972 pub fn elapsed(&mut self, deadline: Deadline) -> bool {
975 if deadline.0 <= self.now {
976 true
977 } else {
978 self.register(deadline);
979 false
980 }
981 }
982
983 pub fn register(&mut self, deadline: Deadline) {
985 if let Some(d) = &mut self.deadline {
986 if deadline < *d {
987 *d = deadline;
988 }
989 } else {
990 self.deadline = Some(deadline)
991 }
992 }
993
994 pub(crate) fn poll(&mut self) -> Option<Deadline> {
996 self.deadline
997 }
998
999 pub(crate) fn awake(&mut self) -> bool {
1001 self.now = INSTANT.now();
1002 if let Some(d) = self.deadline
1003 && d.0 <= self.now
1004 {
1005 self.deadline = None;
1006 return true;
1007 }
1008 false
1009 }
1010
1011 pub fn now(&self) -> DInstant {
1013 self.now
1014 }
1015}
1016impl zng_var::animation::AnimationTimer for LoopTimer {
1017 fn elapsed(&mut self, deadline: Deadline) -> bool {
1018 self.elapsed(deadline)
1019 }
1020
1021 fn register(&mut self, deadline: Deadline) {
1022 self.register(deadline)
1023 }
1024
1025 fn now(&self) -> DInstant {
1026 self.now()
1027 }
1028}
1029
1030#[derive(Default)]
1031struct LoopMonitor {
1032 update_count: u16,
1033 skipped: bool,
1034 trace: Vec<UpdateTrace>,
1035}
1036impl LoopMonitor {
1037 pub fn update(&mut self, update_once: impl FnOnce() -> bool) -> bool {
1039 self.update_count += 1;
1040
1041 if self.update_count < 500 {
1042 update_once()
1043 } else if self.update_count < 1000 {
1044 UpdatesTrace::collect_trace(&mut self.trace, update_once)
1045 } else if self.update_count == 1000 {
1046 self.skipped = true;
1047 let trace = UpdatesTrace::format_trace(mem::take(&mut self.trace));
1048 tracing::error!(
1049 "updated 1000 times without rendering, probably stuck in an infinite loop\n\
1050 will start skipping updates to render and poll system events\n\
1051 top 20 most frequent update requests (in 500 cycles):\n\
1052 {trace}\n\
1053 you can use `UpdatesTraceUiNodeExt` and `updates_trace_event` to refine the trace"
1054 );
1055 false
1056 } else if self.update_count == 1500 {
1057 self.update_count = 1001;
1058 false
1059 } else {
1060 update_once()
1061 }
1062 }
1063
1064 pub fn maybe_trace(&mut self, notify_once: impl FnOnce()) {
1065 if (500..1000).contains(&self.update_count) {
1066 UpdatesTrace::collect_trace(&mut self.trace, notify_once);
1067 } else {
1068 notify_once();
1069 }
1070 }
1071
1072 pub fn finish_frame(&mut self) {
1073 if !self.skipped {
1074 self.skipped = false;
1075 self.update_count = 0;
1076 self.trace = vec![];
1077 }
1078 }
1079}
1080
1081impl APP {
1082 pub fn exit(&self) -> ResponseVar<ExitCancelled> {
1091 APP_PROCESS_SV.write().exit()
1092 }
1093
1094 pub fn is_suspended(&self) -> Var<bool> {
1102 APP_PROCESS_SV.read().is_suspended.read_only()
1103 }
1104}
1105
1106impl APP {
1110 pub fn pause_time_for_update(&self) -> Var<bool> {
1116 APP_PROCESS_SV.read().pause_time_for_updates.clone()
1117 }
1118
1119 pub fn start_manual_time(&self) {
1127 INSTANT_APP.set_mode(InstantMode::Manual);
1128 INSTANT_APP.set_now(INSTANT.now());
1129 UPDATES.update(None);
1130 }
1131
1132 pub fn advance_manual_time(&self, advance: Duration) {
1143 INSTANT_APP.advance_now(advance);
1144 UPDATES.update(None);
1145 }
1146
1147 pub fn set_manual_time(&self, now: DInstant) {
1156 INSTANT_APP.set_now(now);
1157 UPDATES.update(None);
1158 }
1159
1160 pub fn end_manual_time(&self) {
1162 INSTANT_APP.set_mode(match APP.pause_time_for_update().get() {
1163 true => InstantMode::UpdatePaused,
1164 false => InstantMode::Now,
1165 });
1166 UPDATES.update(None);
1167 }
1168}
1169
1170command! {
1171 pub static EXIT_CMD = {
1175 l10n!: true,
1176 name: "Exit",
1177 info: "Close all windows and exit",
1178 shortcut: shortcut!(Exit),
1179 };
1180}
1181
1182#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1186pub struct ExitCancelled;
1187impl fmt::Display for ExitCancelled {
1188 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1189 write!(f, "exit request cancelled")
1190 }
1191}
1192
1193struct AppIntrinsic {
1194 exit_handle: CommandHandle,
1195 pending_exit: Option<PendingExit>,
1196}
1197struct PendingExit {
1198 handle: EventPropagationHandle,
1199 response: ResponderVar<ExitCancelled>,
1200}
1201impl AppIntrinsic {
1202 pub(super) fn pre_init(is_headed: bool, with_renderer: bool, view_process_exe: PathBuf, view_process_env: HashMap<Txt, Txt>) -> Self {
1204 APP_PROCESS_SV
1205 .read()
1206 .pause_time_for_updates
1207 .hook(|a| {
1208 if !matches!(INSTANT.mode(), zng_time::InstantMode::Manual) {
1209 if *a.value() {
1210 INSTANT_APP.set_mode(InstantMode::UpdatePaused);
1211 } else {
1212 INSTANT_APP.set_mode(InstantMode::Now);
1213 }
1214 }
1215 true
1216 })
1217 .perm();
1218
1219 if is_headed {
1220 debug_assert!(with_renderer);
1221
1222 let view_evs_sender = UPDATES.sender();
1223 VIEW_PROCESS.start(view_process_exe, view_process_env, false, move |ev| {
1224 let _ = view_evs_sender.send_view_event(ev);
1225 });
1226 } else if with_renderer {
1227 let view_evs_sender = UPDATES.sender();
1228 VIEW_PROCESS.start(view_process_exe, view_process_env, true, move |ev| {
1229 let _ = view_evs_sender.send_view_event(ev);
1230 });
1231 }
1232
1233 AppIntrinsic {
1234 exit_handle: EXIT_CMD.subscribe(true),
1235 pending_exit: None,
1236 }
1237 }
1238
1239 pub(super) fn exit(&mut self) -> bool {
1241 if let Some(pending) = self.pending_exit.take() {
1242 if pending.handle.is_stopped() {
1243 pending.response.respond(ExitCancelled);
1244 false
1245 } else {
1246 true
1247 }
1248 } else {
1249 false
1250 }
1251 }
1252}
1253impl AppExtension for AppIntrinsic {
1254 fn event_preview(&mut self, update: &mut EventUpdate) {
1255 if VIEW_PROCESS_INITED_EVENT.has(update) {
1256 let filter = APP_PROCESS_SV.read().device_events_filter.get();
1257 if !filter.is_empty()
1258 && let Err(e) = VIEW_PROCESS.set_device_events_filter(filter)
1259 {
1260 tracing::error!("cannot set device events on the view-process, {e}");
1261 }
1262 } else if let Some(args) = EXIT_CMD.on(update) {
1263 args.handle_enabled(&self.exit_handle, |_| {
1264 APP.exit();
1265 });
1266 }
1267 }
1268
1269 fn update(&mut self) {
1270 let mut sv = APP_PROCESS_SV.write();
1271 if let Some(filter) = sv.device_events_filter.get_new()
1272 && let Err(e) = VIEW_PROCESS.set_device_events_filter(filter)
1273 {
1274 tracing::error!("cannot set device events on the view-process, {e}");
1275 }
1276 if let Some(response) = sv.take_requests() {
1277 let args = ExitRequestedArgs::now();
1278 self.pending_exit = Some(PendingExit {
1279 handle: args.propagation().clone(),
1280 response,
1281 });
1282 EXIT_REQUESTED_EVENT.notify(args);
1283 }
1284 }
1285}
1286
1287pub(crate) fn assert_not_view_process() {
1288 if zng_view_api::ViewConfig::from_env().is_some() {
1289 panic!("cannot start App in view-process");
1290 }
1291}
1292#[cfg(feature = "deadlock_detection")]
1297pub fn spawn_deadlock_detection() {
1298 use parking_lot::deadlock;
1299 use std::{
1300 sync::atomic::{self, AtomicBool},
1301 thread,
1302 time::*,
1303 };
1304
1305 static CHECK_RUNNING: AtomicBool = AtomicBool::new(false);
1306
1307 if CHECK_RUNNING.swap(true, atomic::Ordering::SeqCst) {
1308 return;
1309 }
1310
1311 thread::spawn(|| {
1312 loop {
1313 thread::sleep(Duration::from_secs(10));
1314
1315 let deadlocks = deadlock::check_deadlock();
1316 if deadlocks.is_empty() {
1317 continue;
1318 }
1319
1320 use std::fmt::Write;
1321 let mut msg = String::new();
1322
1323 let _ = writeln!(&mut msg, "{} deadlocks detected", deadlocks.len());
1324 for (i, threads) in deadlocks.iter().enumerate() {
1325 let _ = writeln!(&mut msg, "Deadlock #{}, {} threads", i, threads.len());
1326 for t in threads {
1327 let _ = writeln!(&mut msg, "Thread Id {:#?}", t.thread_id());
1328 let _ = writeln!(&mut msg, "{:#?}", t.backtrace());
1329 }
1330 }
1331
1332 #[cfg(not(feature = "test_util"))]
1333 eprint!("{msg}");
1334
1335 #[cfg(feature = "test_util")]
1336 {
1337 use std::io::Write;
1340 let _ = write!(&mut std::io::stderr(), "{msg}");
1341 zng_env::exit(-1);
1342 }
1343 }
1344 });
1345}
1346#[cfg(not(feature = "deadlock_detection"))]
1351pub fn spawn_deadlock_detection() {}
1352
1353app_local! {
1354 pub(super) static APP_PROCESS_SV: AppProcessService = AppProcessService {
1355 exit_requests: None,
1356 extensions: None,
1357 device_events_filter: zng_var::var(Default::default()),
1358 pause_time_for_updates: zng_var::var(true),
1359 is_suspended: zng_var::var(false),
1360 };
1361}
1362
1363pub(super) struct AppProcessService {
1364 exit_requests: Option<ResponderVar<ExitCancelled>>,
1365 extensions: Option<Arc<AppExtensionsInfo>>,
1366 pub(crate) device_events_filter: Var<DeviceEventsFilter>,
1367 pause_time_for_updates: Var<bool>,
1368 is_suspended: Var<bool>,
1369}
1370impl AppProcessService {
1371 pub(super) fn take_requests(&mut self) -> Option<ResponderVar<ExitCancelled>> {
1372 self.exit_requests.take()
1373 }
1374
1375 fn exit(&mut self) -> ResponseVar<ExitCancelled> {
1376 if let Some(r) = &self.exit_requests {
1377 r.response_var()
1378 } else {
1379 let (responder, response) = response_var();
1380 self.exit_requests = Some(responder);
1381 UPDATES.update(None);
1382 response
1383 }
1384 }
1385
1386 pub(super) fn extensions(&self) -> Arc<AppExtensionsInfo> {
1387 self.extensions
1388 .clone()
1389 .unwrap_or_else(|| Arc::new(AppExtensionsInfo { infos: vec![] }))
1390 }
1391
1392 pub(super) fn set_extensions(&mut self, info: AppExtensionsInfo) {
1393 self.extensions = Some(Arc::new(info));
1394 }
1395
1396 pub(super) fn is_running(&self) -> bool {
1397 self.extensions.is_some()
1398 }
1399}
1400
1401#[derive(Debug)]
1403#[allow(clippy::large_enum_variant)] pub(crate) enum AppEvent {
1405 ViewEvent(zng_view_api::Event),
1407 Event(crate::event::EventUpdateMsg),
1409 Update(UpdateOp, Option<WidgetId>),
1411 ResumeUnwind(PanicPayload),
1413 CheckUpdate,
1415}
1416
1417#[derive(Clone)]
1423pub struct AppEventSender(flume::Sender<AppEvent>);
1424impl AppEventSender {
1425 pub(crate) fn new() -> (Self, flume::Receiver<AppEvent>) {
1426 let (sender, receiver) = flume::unbounded();
1427 (Self(sender), receiver)
1428 }
1429
1430 #[allow(clippy::result_large_err)] fn send_app_event(&self, event: AppEvent) -> Result<(), AppChannelError> {
1432 self.0.send(event).map_err(|_| AppChannelError::Disconnected)
1433 }
1434
1435 #[allow(clippy::result_large_err)]
1436 fn send_view_event(&self, event: zng_view_api::Event) -> Result<(), AppChannelError> {
1437 self.0.send(AppEvent::ViewEvent(event)).map_err(|_| AppChannelError::Disconnected)
1438 }
1439
1440 pub fn send_update(&self, op: UpdateOp, target: impl Into<Option<WidgetId>>) -> Result<(), AppChannelError> {
1442 UpdatesTrace::log_update();
1443 self.send_app_event(AppEvent::Update(op, target.into()))
1444 .map_err(|_| AppChannelError::Disconnected)
1445 }
1446
1447 pub(crate) fn send_event(&self, event: crate::event::EventUpdateMsg) -> Result<(), AppChannelError> {
1449 self.send_app_event(AppEvent::Event(event))
1450 .map_err(|_| AppChannelError::Disconnected)
1451 }
1452
1453 pub fn send_resume_unwind(&self, payload: PanicPayload) -> Result<(), AppChannelError> {
1455 self.send_app_event(AppEvent::ResumeUnwind(payload))
1456 .map_err(|_| AppChannelError::Disconnected)
1457 }
1458
1459 pub(crate) fn send_check_update(&self) -> Result<(), AppChannelError> {
1461 self.send_app_event(AppEvent::CheckUpdate)
1462 .map_err(|_| AppChannelError::Disconnected)
1463 }
1464
1465 pub fn waker(&self, target: impl Into<Option<WidgetId>>) -> Waker {
1467 Arc::new(AppWaker(self.0.clone(), target.into())).into()
1468 }
1469
1470 pub fn ext_channel<T>(&self) -> (AppExtSender<T>, AppExtReceiver<T>) {
1472 let (sender, receiver) = flume::unbounded();
1473
1474 (
1475 AppExtSender {
1476 update: self.clone(),
1477 sender,
1478 },
1479 AppExtReceiver { receiver },
1480 )
1481 }
1482
1483 pub fn ext_channel_bounded<T>(&self, cap: usize) -> (AppExtSender<T>, AppExtReceiver<T>) {
1485 let (sender, receiver) = flume::bounded(cap);
1486
1487 (
1488 AppExtSender {
1489 update: self.clone(),
1490 sender,
1491 },
1492 AppExtReceiver { receiver },
1493 )
1494 }
1495}
1496
1497struct AppWaker(flume::Sender<AppEvent>, Option<WidgetId>);
1498impl std::task::Wake for AppWaker {
1499 fn wake(self: std::sync::Arc<Self>) {
1500 self.wake_by_ref()
1501 }
1502 fn wake_by_ref(self: &Arc<Self>) {
1503 let _ = self.0.send(AppEvent::Update(UpdateOp::Update, self.1));
1504 }
1505}
1506
1507type PanicPayload = Box<dyn std::any::Any + Send + 'static>;
1508
1509pub struct AppExtSender<T> {
1513 update: AppEventSender,
1514 sender: flume::Sender<T>,
1515}
1516impl<T> Clone for AppExtSender<T> {
1517 fn clone(&self) -> Self {
1518 Self {
1519 update: self.update.clone(),
1520 sender: self.sender.clone(),
1521 }
1522 }
1523}
1524impl<T: Send> AppExtSender<T> {
1525 pub fn send(&self, msg: T) -> Result<(), AppChannelError> {
1527 match self.update.send_update(UpdateOp::Update, None) {
1528 Ok(()) => self.sender.send(msg).map_err(|_| AppChannelError::Disconnected),
1529 Err(_) => Err(AppChannelError::Disconnected),
1530 }
1531 }
1532
1533 pub fn send_timeout(&self, msg: T, dur: Duration) -> Result<(), AppChannelError> {
1535 match self.update.send_update(UpdateOp::Update, None) {
1536 Ok(()) => self.sender.send_timeout(msg, dur).map_err(|e| match e {
1537 flume::SendTimeoutError::Timeout(_) => AppChannelError::Timeout,
1538 flume::SendTimeoutError::Disconnected(_) => AppChannelError::Disconnected,
1539 }),
1540 Err(_) => Err(AppChannelError::Disconnected),
1541 }
1542 }
1543
1544 pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), AppChannelError> {
1546 match self.update.send_update(UpdateOp::Update, None) {
1547 Ok(()) => self.sender.send_deadline(msg, deadline).map_err(|e| match e {
1548 flume::SendTimeoutError::Timeout(_) => AppChannelError::Timeout,
1549 flume::SendTimeoutError::Disconnected(_) => AppChannelError::Disconnected,
1550 }),
1551 Err(_) => Err(AppChannelError::Disconnected),
1552 }
1553 }
1554}
1555
1556pub struct AppExtReceiver<T> {
1560 receiver: flume::Receiver<T>,
1561}
1562impl<T> Clone for AppExtReceiver<T> {
1563 fn clone(&self) -> Self {
1564 Self {
1565 receiver: self.receiver.clone(),
1566 }
1567 }
1568}
1569impl<T> AppExtReceiver<T> {
1570 pub fn try_recv(&self) -> Result<T, Option<AppChannelError>> {
1575 self.receiver.try_recv().map_err(|e| match e {
1576 flume::TryRecvError::Empty => None,
1577 flume::TryRecvError::Disconnected => Some(AppChannelError::Disconnected),
1578 })
1579 }
1580}
1581
1582#[derive(Debug, Clone)]
1584#[non_exhaustive]
1585pub enum AppChannelError {
1586 Disconnected,
1588 Timeout,
1590}
1591impl fmt::Display for AppChannelError {
1592 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1593 match self {
1594 AppChannelError::Disconnected => write!(f, "cannot receive because the sender disconnected"),
1595 AppChannelError::Timeout => write!(f, "deadline elapsed before message could be send/received"),
1596 }
1597 }
1598}
1599impl std::error::Error for AppChannelError {}
1600impl From<flume::RecvTimeoutError> for AppChannelError {
1601 fn from(value: flume::RecvTimeoutError) -> Self {
1602 match value {
1603 flume::RecvTimeoutError::Timeout => AppChannelError::Timeout,
1604 flume::RecvTimeoutError::Disconnected => AppChannelError::Disconnected,
1605 }
1606 }
1607}
1608
1609event_args! {
1610 pub struct ExitRequestedArgs {
1614
1615 ..
1616
1617 fn delivery_list(&self, list: &mut UpdateDeliveryList) {
1619 list.search_all()
1620 }
1621 }
1622}
1623
1624event! {
1625 pub static EXIT_REQUESTED_EVENT: ExitRequestedArgs;
1633}
1634
1635trait ReceiverExt<T> {
1637 fn recv_deadline_sp(&self, deadline: Deadline) -> Result<T, flume::RecvTimeoutError>;
1639}
1640
1641const WORST_SLEEP_ERR: Duration = Duration::from_millis(if cfg!(windows) { 20 } else { 10 });
1642const WORST_SPIN_ERR: Duration = Duration::from_millis(if cfg!(windows) { 2 } else { 1 });
1643
1644impl<T> ReceiverExt<T> for flume::Receiver<T> {
1645 fn recv_deadline_sp(&self, deadline: Deadline) -> Result<T, flume::RecvTimeoutError> {
1646 loop {
1647 if let Some(d) = deadline.0.checked_duration_since(INSTANT.now()) {
1648 if matches!(INSTANT.mode(), zng_time::InstantMode::Manual) {
1649 match self.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
1652 Err(flume::RecvTimeoutError::Timeout) => continue, interrupt => return interrupt,
1654 }
1655 } else if d > WORST_SLEEP_ERR {
1656 #[cfg(not(target_arch = "wasm32"))]
1658 match self.recv_deadline(deadline.0.checked_sub(WORST_SLEEP_ERR).unwrap().into()) {
1659 Err(flume::RecvTimeoutError::Timeout) => continue, interrupt => return interrupt,
1661 }
1662
1663 #[cfg(target_arch = "wasm32")] match self.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
1665 Err(flume::RecvTimeoutError::Timeout) => continue, interrupt => return interrupt,
1667 }
1668 } else if d > WORST_SPIN_ERR {
1669 let spin_deadline = Deadline(deadline.0.checked_sub(WORST_SPIN_ERR).unwrap());
1670
1671 while !spin_deadline.has_elapsed() {
1673 match self.try_recv() {
1674 Err(flume::TryRecvError::Empty) => std::thread::yield_now(),
1675 Err(flume::TryRecvError::Disconnected) => return Err(flume::RecvTimeoutError::Disconnected),
1676 Ok(msg) => return Ok(msg),
1677 }
1678 }
1679 continue; } else {
1681 while !deadline.has_elapsed() {
1683 std::thread::yield_now();
1684 }
1685 return Err(flume::RecvTimeoutError::Timeout);
1686 }
1687 } else {
1688 return Err(flume::RecvTimeoutError::Timeout);
1689 }
1690 }
1691 }
1692}