1use std::{
2 collections::HashMap,
3 fmt, mem,
4 path::PathBuf,
5 sync::Arc,
6 task::Waker,
7 time::{Duration, Instant},
8};
9
10use crate::Deadline;
11use parking_lot::Mutex;
12use zng_app_context::{AppScope, app_local};
13use zng_task::DEADLINE_APP;
14use zng_time::{INSTANT_APP, InstantMode};
15use zng_txt::Txt;
16use zng_var::{ResponderVar, ResponseVar, VARS_APP, Var, response_var};
17use zng_view_api::{DeviceEventsFilter, raw_input::InputDeviceEvent};
18
19use crate::{
20 APP, AppControlFlow, AppEventObserver, AppExtension, AppExtensionsInfo, DInstant, INSTANT,
21 event::{AnyEventArgs, CommandHandle, CommandInfoExt, CommandNameExt, EVENTS, EventPropagationHandle, command, event},
22 event_args,
23 shortcut::CommandShortcutExt,
24 shortcut::shortcut,
25 timer::TimersService,
26 update::{
27 ContextUpdates, EventUpdate, InfoUpdates, LayoutUpdates, RenderUpdates, UPDATES, UpdateOp, UpdateTrace, UpdatesTrace, WidgetUpdates,
28 },
29 view_process::{raw_device_events::InputDeviceId, *},
30 widget::WidgetId,
31 window::WindowId,
32};
33
34pub(crate) struct RunningApp<E: AppExtension> {
36 extensions: (AppIntrinsic, E),
37
38 receiver: flume::Receiver<AppEvent>,
39
40 loop_timer: LoopTimer,
41 loop_monitor: LoopMonitor,
42
43 pending_view_events: Vec<zng_view_api::Event>,
44 pending_view_frame_events: Vec<zng_view_api::window::EventFrameRendered>,
45 pending: ContextUpdates,
46
47 exited: bool,
48
49 _scope: AppScope,
51}
52impl<E: AppExtension> RunningApp<E> {
53 pub(crate) fn start(
54 scope: AppScope,
55 mut extensions: E,
56 is_headed: bool,
57 with_renderer: bool,
58 view_process_exe: Option<PathBuf>,
59 view_process_env: HashMap<Txt, Txt>,
60 ) -> Self {
61 let _s = tracing::debug_span!("APP::start").entered();
62
63 let (sender, receiver) = AppEventSender::new();
64
65 UPDATES.init(sender);
66
67 fn app_waker() {
68 UPDATES.update(None);
69 }
70 VARS_APP.init_app_waker(app_waker);
71 VARS_APP.init_modify_trace(UpdatesTrace::log_var);
72 DEADLINE_APP.init_deadline_service(crate::timer::deadline_service);
73 zng_var::animation::TRANSITIONABLE_APP.init_rgba_lerp(zng_color::lerp_rgba);
74
75 let mut info = AppExtensionsInfo::start();
76 {
77 let _t = INSTANT_APP.pause_for_update();
78 extensions.register(&mut info);
79 }
80
81 {
82 let mut sv = APP_PROCESS_SV.write();
83 sv.set_extensions(info);
84 }
85
86 if with_renderer && view_process_exe.is_none() {
87 zng_env::assert_inited();
88 }
89
90 #[cfg(not(target_arch = "wasm32"))]
91 let view_process_exe = view_process_exe.unwrap_or_else(|| std::env::current_exe().expect("current_exe"));
92 #[cfg(target_arch = "wasm32")]
93 let view_process_exe = std::path::PathBuf::from("<wasm>");
94
95 let process = AppIntrinsic::pre_init(is_headed, with_renderer, view_process_exe, view_process_env);
96
97 {
98 let _s = tracing::debug_span!("extensions.init").entered();
99 extensions.init();
100 }
101
102 let args = AppStartArgs { _private: () };
103 for h in zng_unique_id::hot_static_ref!(ON_APP_START).lock().iter_mut() {
104 h(&args)
105 }
106
107 RunningApp {
108 extensions: (process, extensions),
109
110 receiver,
111
112 loop_timer: LoopTimer::default(),
113 loop_monitor: LoopMonitor::default(),
114
115 pending_view_events: Vec::with_capacity(100),
116 pending_view_frame_events: Vec::with_capacity(5),
117 pending: ContextUpdates {
118 events: Vec::with_capacity(100),
119 update: false,
120 info: false,
121 layout: false,
122 render: false,
123 update_widgets: WidgetUpdates::default(),
124 info_widgets: InfoUpdates::default(),
125 layout_widgets: LayoutUpdates::default(),
126 render_widgets: RenderUpdates::default(),
127 render_update_widgets: RenderUpdates::default(),
128 },
129 exited: false,
130
131 _scope: scope,
132 }
133 }
134
135 pub fn has_exited(&self) -> bool {
136 self.exited
137 }
138
139 pub fn notify_event<O: AppEventObserver>(&mut self, mut update: EventUpdate, observer: &mut O) {
141 let _scope = tracing::trace_span!("notify_event", event = update.event().name()).entered();
142
143 let _t = INSTANT_APP.pause_for_update();
144
145 update.event().on_update(&mut update);
146
147 self.extensions.event_preview(&mut update);
148 observer.event_preview(&mut update);
149 update.call_pre_actions();
150
151 self.extensions.event_ui(&mut update);
152 observer.event_ui(&mut update);
153
154 self.extensions.event(&mut update);
155 observer.event(&mut update);
156 update.call_pos_actions();
157 }
158
159 fn input_device_id(&mut self, id: zng_view_api::raw_input::InputDeviceId) -> InputDeviceId {
160 VIEW_PROCESS.input_device_id(id)
161 }
162
163 fn on_view_event<O: AppEventObserver>(&mut self, ev: zng_view_api::Event, observer: &mut O) {
165 use crate::view_process::raw_device_events::*;
166 use crate::view_process::raw_events::*;
167 use zng_view_api::Event;
168
169 fn window_id(id: zng_view_api::window::WindowId) -> WindowId {
170 WindowId::from_raw(id.get())
171 }
172
173 match ev {
174 Event::MouseMoved {
175 window: w_id,
176 device: d_id,
177 coalesced_pos,
178 position,
179 } => {
180 let args = RawMouseMovedArgs::now(window_id(w_id), self.input_device_id(d_id), coalesced_pos, position);
181 self.notify_event(RAW_MOUSE_MOVED_EVENT.new_update(args), observer);
182 }
183 Event::MouseEntered {
184 window: w_id,
185 device: d_id,
186 } => {
187 let args = RawMouseArgs::now(window_id(w_id), self.input_device_id(d_id));
188 self.notify_event(RAW_MOUSE_ENTERED_EVENT.new_update(args), observer);
189 }
190 Event::MouseLeft {
191 window: w_id,
192 device: d_id,
193 } => {
194 let args = RawMouseArgs::now(window_id(w_id), self.input_device_id(d_id));
195 self.notify_event(RAW_MOUSE_LEFT_EVENT.new_update(args), observer);
196 }
197 Event::WindowChanged(c) => {
198 let monitor_id = c.monitor.map(|id| VIEW_PROCESS.monitor_id(id));
199 let args = RawWindowChangedArgs::now(
200 window_id(c.window),
201 c.state,
202 c.position,
203 monitor_id,
204 c.size,
205 c.safe_padding,
206 c.cause,
207 c.frame_wait_id,
208 );
209 self.notify_event(RAW_WINDOW_CHANGED_EVENT.new_update(args), observer);
210 }
211 Event::DragHovered { window, data, allowed } => {
212 let args = RawDragHoveredArgs::now(window_id(window), data, allowed);
213 self.notify_event(RAW_DRAG_HOVERED_EVENT.new_update(args), observer);
214 }
215 Event::DragMoved {
216 window,
217 coalesced_pos,
218 position,
219 } => {
220 let args = RawDragMovedArgs::now(window_id(window), coalesced_pos, position);
221 self.notify_event(RAW_DRAG_MOVED_EVENT.new_update(args), observer);
222 }
223 Event::DragDropped {
224 window,
225 data,
226 allowed,
227 drop_id,
228 } => {
229 let args = RawDragDroppedArgs::now(window_id(window), data, allowed, drop_id);
230 self.notify_event(RAW_DRAG_DROPPED_EVENT.new_update(args), observer);
231 }
232 Event::DragCancelled { window } => {
233 let args = RawDragCancelledArgs::now(window_id(window));
234 self.notify_event(RAW_DRAG_CANCELLED_EVENT.new_update(args), observer);
235 }
236 Event::AppDragEnded { window, drag, applied } => {
237 let args = RawAppDragEndedArgs::now(window_id(window), drag, applied);
238 self.notify_event(RAW_APP_DRAG_ENDED_EVENT.new_update(args), observer);
239 }
240 Event::FocusChanged { prev, new } => {
241 let args = RawWindowFocusArgs::now(prev.map(window_id), new.map(window_id));
242 self.notify_event(RAW_WINDOW_FOCUS_EVENT.new_update(args), observer);
243 }
244 Event::KeyboardInput {
245 window: w_id,
246 device: d_id,
247 key_code,
248 state,
249 key,
250 key_location,
251 key_modified,
252 text,
253 } => {
254 let args = RawKeyInputArgs::now(
255 window_id(w_id),
256 self.input_device_id(d_id),
257 key_code,
258 key_location,
259 state,
260 key,
261 key_modified,
262 text,
263 );
264 self.notify_event(RAW_KEY_INPUT_EVENT.new_update(args), observer);
265 }
266 Event::Ime { window: w_id, ime } => {
267 let args = RawImeArgs::now(window_id(w_id), ime);
268 self.notify_event(RAW_IME_EVENT.new_update(args), observer);
269 }
270
271 Event::MouseWheel {
272 window: w_id,
273 device: d_id,
274 delta,
275 phase,
276 } => {
277 let args = RawMouseWheelArgs::now(window_id(w_id), self.input_device_id(d_id), delta, phase);
278 self.notify_event(RAW_MOUSE_WHEEL_EVENT.new_update(args), observer);
279 }
280 Event::MouseInput {
281 window: w_id,
282 device: d_id,
283 state,
284 button,
285 } => {
286 let args = RawMouseInputArgs::now(window_id(w_id), self.input_device_id(d_id), state, button);
287 self.notify_event(RAW_MOUSE_INPUT_EVENT.new_update(args), observer);
288 }
289 Event::TouchpadPressure {
290 window: w_id,
291 device: d_id,
292 pressure,
293 stage,
294 } => {
295 let args = RawTouchpadPressureArgs::now(window_id(w_id), self.input_device_id(d_id), pressure, stage);
296 self.notify_event(RAW_TOUCHPAD_PRESSURE_EVENT.new_update(args), observer);
297 }
298 Event::AxisMotion {
299 window: w_id,
300 device: d_id,
301 axis,
302 value,
303 } => {
304 let args = RawAxisMotionArgs::now(window_id(w_id), self.input_device_id(d_id), axis, value);
305 self.notify_event(RAW_AXIS_MOTION_EVENT.new_update(args), observer);
306 }
307 Event::Touch {
308 window: w_id,
309 device: d_id,
310 touches,
311 } => {
312 let args = RawTouchArgs::now(window_id(w_id), self.input_device_id(d_id), touches);
313 self.notify_event(RAW_TOUCH_EVENT.new_update(args), observer);
314 }
315 Event::ScaleFactorChanged {
316 monitor: id,
317 windows,
318 scale_factor,
319 } => {
320 let monitor_id = VIEW_PROCESS.monitor_id(id);
321 let windows: Vec<_> = windows.into_iter().map(window_id).collect();
322 let args = RawScaleFactorChangedArgs::now(monitor_id, windows, scale_factor);
323 self.notify_event(RAW_SCALE_FACTOR_CHANGED_EVENT.new_update(args), observer);
324 }
325 Event::MonitorsChanged(monitors) => {
326 let monitors: Vec<_> = monitors.into_iter().map(|(id, info)| (VIEW_PROCESS.monitor_id(id), info)).collect();
327 let args = RawMonitorsChangedArgs::now(monitors);
328 self.notify_event(RAW_MONITORS_CHANGED_EVENT.new_update(args), observer);
329 }
330 Event::AudioDevicesChanged(_audio_devices) => {
331 }
333 Event::WindowCloseRequested(w_id) => {
334 let args = RawWindowCloseRequestedArgs::now(window_id(w_id));
335 self.notify_event(RAW_WINDOW_CLOSE_REQUESTED_EVENT.new_update(args), observer);
336 }
337 Event::WindowOpened(w_id, data) => {
338 let w_id = window_id(w_id);
339 let (window, data) = VIEW_PROCESS.on_window_opened(w_id, data);
340 let args = RawWindowOpenArgs::now(w_id, window, data);
341 self.notify_event(RAW_WINDOW_OPEN_EVENT.new_update(args), observer);
342 }
343 Event::HeadlessOpened(w_id, data) => {
344 let w_id = window_id(w_id);
345 let (surface, data) = VIEW_PROCESS.on_headless_opened(w_id, data);
346 let args = RawHeadlessOpenArgs::now(w_id, surface, data);
347 self.notify_event(RAW_HEADLESS_OPEN_EVENT.new_update(args), observer);
348 }
349 Event::WindowOrHeadlessOpenError { id: w_id, error } => {
350 let w_id = window_id(w_id);
351 let args = RawWindowOrHeadlessOpenErrorArgs::now(w_id, error);
352 self.notify_event(RAW_WINDOW_OR_HEADLESS_OPEN_ERROR_EVENT.new_update(args), observer);
353 }
354 Event::WindowClosed(w_id) => {
355 let args = RawWindowCloseArgs::now(window_id(w_id));
356 self.notify_event(RAW_WINDOW_CLOSE_EVENT.new_update(args), observer);
357 }
358 Event::ImageMetadataLoaded {
359 image: id,
360 size,
361 ppi,
362 is_mask,
363 } => {
364 if let Some(img) = VIEW_PROCESS.on_image_metadata_loaded(id, size, ppi, is_mask) {
365 let args = RawImageArgs::now(img);
366 self.notify_event(RAW_IMAGE_METADATA_LOADED_EVENT.new_update(args), observer);
367 }
368 }
369 Event::ImagePartiallyLoaded {
370 image: id,
371 partial_size,
372 ppi,
373 is_opaque,
374 is_mask,
375 partial_pixels: partial_bgra8,
376 } => {
377 if let Some(img) = VIEW_PROCESS.on_image_partially_loaded(id, partial_size, ppi, is_opaque, is_mask, partial_bgra8) {
378 let args = RawImageArgs::now(img);
379 self.notify_event(RAW_IMAGE_PARTIALLY_LOADED_EVENT.new_update(args), observer);
380 }
381 }
382 Event::ImageLoaded(image) => {
383 if let Some(img) = VIEW_PROCESS.on_image_loaded(image) {
384 let args = RawImageArgs::now(img);
385 self.notify_event(RAW_IMAGE_LOADED_EVENT.new_update(args), observer);
386 }
387 }
388 Event::ImageLoadError { image: id, error } => {
389 if let Some(img) = VIEW_PROCESS.on_image_error(id, error) {
390 let args = RawImageArgs::now(img);
391 self.notify_event(RAW_IMAGE_LOAD_ERROR_EVENT.new_update(args), observer);
392 }
393 }
394 Event::ImageEncoded { image: id, format, data } => VIEW_PROCESS.on_image_encoded(id, format, data),
395 Event::ImageEncodeError { image: id, format, error } => {
396 VIEW_PROCESS.on_image_encode_error(id, format, error);
397 }
398 Event::FrameImageReady {
399 window: w_id,
400 frame: frame_id,
401 image: image_id,
402 selection,
403 } => {
404 if let Some(img) = VIEW_PROCESS.on_frame_image_ready(image_id) {
405 let args = RawFrameImageReadyArgs::now(img, window_id(w_id), frame_id, selection);
406 self.notify_event(RAW_FRAME_IMAGE_READY_EVENT.new_update(args), observer);
407 }
408 }
409
410 Event::AccessInit { window: w_id } => {
411 self.notify_event(crate::access::on_access_init(window_id(w_id)), observer);
412 }
413 Event::AccessCommand {
414 window: win_id,
415 target: wgt_id,
416 command,
417 } => {
418 if let Some(update) = crate::access::on_access_command(window_id(win_id), WidgetId::from_raw(wgt_id.0), command) {
419 self.notify_event(update, observer);
420 }
421 }
422 Event::AccessDeinit { window: w_id } => {
423 self.notify_event(crate::access::on_access_deinit(window_id(w_id)), observer);
424 }
425
426 Event::MsgDialogResponse(id, response) => {
428 VIEW_PROCESS.on_message_dlg_response(id, response);
429 }
430 Event::FileDialogResponse(id, response) => {
431 VIEW_PROCESS.on_file_dlg_response(id, response);
432 }
433
434 Event::ExtensionEvent(id, payload) => {
436 let args = RawExtensionEventArgs::now(id, payload);
437 self.notify_event(RAW_EXTENSION_EVENT.new_update(args), observer);
438 }
439
440 Event::FontsChanged => {
442 let args = RawFontChangedArgs::now();
443 self.notify_event(RAW_FONT_CHANGED_EVENT.new_update(args), observer);
444 }
445 Event::FontAaChanged(aa) => {
446 let args = RawFontAaChangedArgs::now(aa);
447 self.notify_event(RAW_FONT_AA_CHANGED_EVENT.new_update(args), observer);
448 }
449 Event::MultiClickConfigChanged(cfg) => {
450 let args = RawMultiClickConfigChangedArgs::now(cfg);
451 self.notify_event(RAW_MULTI_CLICK_CONFIG_CHANGED_EVENT.new_update(args), observer);
452 }
453 Event::AnimationsConfigChanged(cfg) => {
454 VARS_APP.set_sys_animations_enabled(cfg.enabled);
455 let args = RawAnimationsConfigChangedArgs::now(cfg);
456 self.notify_event(RAW_ANIMATIONS_CONFIG_CHANGED_EVENT.new_update(args), observer);
457 }
458 Event::KeyRepeatConfigChanged(cfg) => {
459 let args = RawKeyRepeatConfigChangedArgs::now(cfg);
460 self.notify_event(RAW_KEY_REPEAT_CONFIG_CHANGED_EVENT.new_update(args), observer);
461 }
462 Event::TouchConfigChanged(cfg) => {
463 let args = RawTouchConfigChangedArgs::now(cfg);
464 self.notify_event(RAW_TOUCH_CONFIG_CHANGED_EVENT.new_update(args), observer);
465 }
466 Event::LocaleChanged(cfg) => {
467 let args = RawLocaleChangedArgs::now(cfg);
468 self.notify_event(RAW_LOCALE_CONFIG_CHANGED_EVENT.new_update(args), observer);
469 }
470 Event::ColorsConfigChanged(cfg) => {
471 let args = RawColorsConfigChangedArgs::now(cfg);
472 self.notify_event(RAW_COLORS_CONFIG_CHANGED_EVENT.new_update(args), observer);
473 }
474 Event::ChromeConfigChanged(cfg) => {
475 let args = RawChromeConfigChangedArgs::now(cfg);
476 self.notify_event(RAW_CHROME_CONFIG_CHANGED_EVENT.new_update(args), observer);
477 }
478
479 Event::InputDevicesChanged(devices) => {
481 let devices: HashMap<_, _> = devices.into_iter().map(|(d_id, info)| (self.input_device_id(d_id), info)).collect();
482 INPUT_DEVICES.update(devices.clone());
483 let args = InputDevicesChangedArgs::now(devices);
484 self.notify_event(INPUT_DEVICES_CHANGED_EVENT.new_update(args), observer);
485 }
486 Event::InputDeviceEvent { device, event } => {
487 let d_id = self.input_device_id(device);
488 match event {
489 InputDeviceEvent::PointerMotion { delta } => {
490 let args = PointerMotionArgs::now(d_id, delta);
491 self.notify_event(POINTER_MOTION_EVENT.new_update(args), observer);
492 }
493 InputDeviceEvent::ScrollMotion { delta } => {
494 let args = ScrollMotionArgs::now(d_id, delta);
495 self.notify_event(SCROLL_MOTION_EVENT.new_update(args), observer);
496 }
497 InputDeviceEvent::AxisMotion { axis, value } => {
498 let args = AxisMotionArgs::now(d_id, axis, value);
499 self.notify_event(AXIS_MOTION_EVENT.new_update(args), observer);
500 }
501 InputDeviceEvent::Button { button, state } => {
502 let args = ButtonArgs::now(d_id, button, state);
503 self.notify_event(BUTTON_EVENT.new_update(args), observer);
504 }
505 InputDeviceEvent::Key { key_code, state } => {
506 let args = KeyArgs::now(d_id, key_code, state);
507 self.notify_event(KEY_EVENT.new_update(args), observer);
508 }
509 _ => {}
510 }
511 }
512
513 Event::LowMemory => {
514 LOW_MEMORY_EVENT.notify(LowMemoryArgs::now());
515 }
516
517 Event::RecoveredFromComponentPanic { component, recover, panic } => {
518 tracing::error!(
519 "view-process recovered from internal component panic\n component: {component}\n recover: {recover}\n```panic\n{panic}\n```"
520 );
521 }
522
523 Event::Inited(zng_view_api::Inited { .. }) | Event::Suspended | Event::Disconnected(_) | Event::FrameRendered(_) => {
525 unreachable!()
526 } _ => {}
529 }
530 }
531
532 fn on_view_rendered_event<O: AppEventObserver>(&mut self, ev: zng_view_api::window::EventFrameRendered, observer: &mut O) {
534 debug_assert!(ev.window != zng_view_api::window::WindowId::INVALID);
535 let window_id = WindowId::from_raw(ev.window.get());
536 let image = ev.frame_image.map(|img| VIEW_PROCESS.on_frame_image(img));
538 let args = crate::view_process::raw_events::RawFrameRenderedArgs::now(window_id, ev.frame, image);
539 self.notify_event(crate::view_process::raw_events::RAW_FRAME_RENDERED_EVENT.new_update(args), observer);
540 }
541
542 pub(crate) fn run_headed(mut self) {
543 let mut observer = ();
544 #[cfg(feature = "dyn_app_extension")]
545 let mut observer = observer.as_dyn();
546
547 self.apply_updates(&mut observer);
548 self.apply_update_events(&mut observer);
549 let mut wait = false;
550 loop {
551 wait = match self.poll_impl(wait, &mut observer) {
552 AppControlFlow::Poll => false,
553 AppControlFlow::Wait => true,
554 AppControlFlow::Exit => break,
555 };
556 }
557 }
558
559 fn push_coalesce<O: AppEventObserver>(&mut self, ev: AppEvent, observer: &mut O) {
560 match ev {
561 AppEvent::ViewEvent(ev) => match ev {
562 zng_view_api::Event::FrameRendered(ev) => {
563 if ev.window == zng_view_api::window::WindowId::INVALID {
564 tracing::error!("ignored rendered event for invalid window id, {ev:?}");
565 return;
566 }
567
568 let window = WindowId::from_raw(ev.window.get());
569
570 {
572 if VIEW_PROCESS.is_available() {
573 VIEW_PROCESS.on_frame_rendered(window);
574 }
575 }
576
577 #[cfg(debug_assertions)]
578 if self.pending_view_frame_events.iter().any(|e| e.window == ev.window) {
579 tracing::warn!("window `{window:?}` probably sent a frame request without awaiting renderer idle");
580 }
581
582 self.pending_view_frame_events.push(ev);
583 }
584 zng_view_api::Event::Inited(zng_view_api::Inited {
585 generation,
586 is_respawn,
587 extensions,
588 ..
589 }) => {
590 if is_respawn {
592 VIEW_PROCESS.on_respawned(generation);
593 APP_PROCESS_SV.read().is_suspended.set(false);
594 }
595
596 VIEW_PROCESS.handle_inited(generation, extensions.clone());
597
598 let args = crate::view_process::ViewProcessInitedArgs::now(generation, is_respawn, extensions);
599 self.notify_event(VIEW_PROCESS_INITED_EVENT.new_update(args), observer);
600 }
601 zng_view_api::Event::Suspended => {
602 VIEW_PROCESS.handle_suspended();
603 let args = crate::view_process::ViewProcessSuspendedArgs::now();
604 self.notify_event(VIEW_PROCESS_SUSPENDED_EVENT.new_update(args), observer);
605 APP_PROCESS_SV.read().is_suspended.set(true);
606 }
607 zng_view_api::Event::Disconnected(vp_gen) => {
608 VIEW_PROCESS.handle_disconnect(vp_gen);
610 }
611 ev => {
612 if let Some(last) = self.pending_view_events.last_mut() {
613 match last.coalesce(ev) {
614 Ok(()) => {}
615 Err(ev) => self.pending_view_events.push(ev),
616 }
617 } else {
618 self.pending_view_events.push(ev);
619 }
620 }
621 },
622 AppEvent::Event(ev) => EVENTS.notify(ev.get()),
623 AppEvent::Update(op, target) => {
624 UPDATES.update_op(op, target);
625 }
626 AppEvent::CheckUpdate => {}
627 AppEvent::ResumeUnwind(p) => std::panic::resume_unwind(p),
628 }
629 }
630
631 fn has_pending_updates(&mut self) -> bool {
632 !self.pending_view_events.is_empty() || self.pending.has_updates() || UPDATES.has_pending_updates() || !self.receiver.is_empty()
633 }
634
635 pub(crate) fn poll<O: AppEventObserver>(&mut self, wait_app_event: bool, observer: &mut O) -> AppControlFlow {
636 #[cfg(feature = "dyn_app_extension")]
637 let mut observer = observer.as_dyn();
638 #[cfg(feature = "dyn_app_extension")]
639 let observer = &mut observer;
640 self.poll_impl(wait_app_event, observer)
641 }
642 fn poll_impl<O: AppEventObserver>(&mut self, wait_app_event: bool, observer: &mut O) -> AppControlFlow {
643 let mut disconnected = false;
644
645 if self.exited {
646 return AppControlFlow::Exit;
647 }
648
649 if wait_app_event {
650 let idle = tracing::debug_span!("<idle>", ended_by = tracing::field::Empty).entered();
651
652 let timer = if self.view_is_busy() { None } else { self.loop_timer.poll() };
653 if let Some(time) = timer {
654 match self.receiver.recv_deadline_sp(time) {
655 Ok(ev) => {
656 idle.record("ended_by", "event");
657 drop(idle);
658 self.push_coalesce(ev, observer)
659 }
660 Err(e) => match e {
661 flume::RecvTimeoutError::Timeout => {
662 idle.record("ended_by", "timeout");
663 }
664 flume::RecvTimeoutError::Disconnected => {
665 idle.record("ended_by", "disconnected");
666 disconnected = true
667 }
668 },
669 }
670 } else {
671 match self.receiver.recv() {
672 Ok(ev) => {
673 idle.record("ended_by", "event");
674 drop(idle);
675 self.push_coalesce(ev, observer)
676 }
677 Err(e) => match e {
678 flume::RecvError::Disconnected => {
679 idle.record("ended_by", "disconnected");
680 disconnected = true
681 }
682 },
683 }
684 }
685 }
686 loop {
687 match self.receiver.try_recv() {
688 Ok(ev) => self.push_coalesce(ev, observer),
689 Err(e) => match e {
690 flume::TryRecvError::Empty => break,
691 flume::TryRecvError::Disconnected => {
692 disconnected = true;
693 break;
694 }
695 },
696 }
697 }
698 if disconnected {
699 panic!("app events channel disconnected");
700 }
701
702 if self.view_is_busy() {
703 return AppControlFlow::Wait;
704 }
705
706 UPDATES.on_app_awake();
707
708 let updated_timers = self.loop_timer.awake();
710 if updated_timers {
711 UPDATES.update_timers(&mut self.loop_timer);
713 self.apply_updates(observer);
714 }
715
716 let mut events = mem::take(&mut self.pending_view_events);
717 for ev in events.drain(..) {
718 self.on_view_event(ev, observer);
719 self.apply_updates(observer);
720 }
721 debug_assert!(self.pending_view_events.is_empty());
722 self.pending_view_events = events; let mut events = mem::take(&mut self.pending_view_frame_events);
725 for ev in events.drain(..) {
726 self.on_view_rendered_event(ev, observer);
727 }
728 self.pending_view_frame_events = events;
729
730 if self.has_pending_updates() {
731 self.apply_updates(observer);
732 self.apply_update_events(observer);
733 }
734
735 if self.view_is_busy() {
736 return AppControlFlow::Wait;
737 }
738
739 self.finish_frame(observer);
740
741 UPDATES.next_deadline(&mut self.loop_timer);
742
743 if self.extensions.0.exit() {
744 UPDATES.on_app_sleep();
745 self.exited = true;
746 AppControlFlow::Exit
747 } else if self.has_pending_updates() || UPDATES.has_pending_layout_or_render() {
748 AppControlFlow::Poll
749 } else {
750 UPDATES.on_app_sleep();
751 AppControlFlow::Wait
752 }
753 }
754
755 fn apply_updates<O: AppEventObserver>(&mut self, observer: &mut O) {
757 let _s = tracing::debug_span!("apply_updates").entered();
758
759 let mut run = true;
760 while run {
761 run = self.loop_monitor.update(|| {
762 let mut any = false;
763
764 self.pending |= UPDATES.apply_info();
765 if mem::take(&mut self.pending.info) {
766 any = true;
767 let _s = tracing::debug_span!("info").entered();
768
769 let mut info_widgets = mem::take(&mut self.pending.info_widgets);
770
771 let _t = INSTANT_APP.pause_for_update();
772
773 {
774 let _s = tracing::debug_span!("ext.info").entered();
775 self.extensions.info(&mut info_widgets);
776 }
777 {
778 let _s = tracing::debug_span!("obs.info").entered();
779 observer.info(&mut info_widgets);
780 }
781 }
782
783 self.pending |= UPDATES.apply_updates();
784 TimersService::notify();
785 if mem::take(&mut self.pending.update) {
786 any = true;
787 let _s = tracing::debug_span!("update").entered();
788
789 let mut update_widgets = mem::take(&mut self.pending.update_widgets);
790
791 let _t = INSTANT_APP.pause_for_update();
792
793 {
794 let _s = tracing::debug_span!("ext.update_preview").entered();
795 self.extensions.update_preview();
796 }
797 {
798 let _s = tracing::debug_span!("obs.update_preview").entered();
799 observer.update_preview();
800 }
801 UPDATES.on_pre_updates();
802
803 {
804 let _s = tracing::debug_span!("ext.update_ui").entered();
805 self.extensions.update_ui(&mut update_widgets);
806 }
807 {
808 let _s = tracing::debug_span!("obs.update_ui").entered();
809 observer.update_ui(&mut update_widgets);
810 }
811
812 {
813 let _s = tracing::debug_span!("ext.update").entered();
814 self.extensions.update();
815 }
816 {
817 let _s = tracing::debug_span!("obs.update").entered();
818 observer.update();
819 }
820 UPDATES.on_updates();
821 }
822
823 any
824 });
825 }
826 }
827
828 fn apply_update_events<O: AppEventObserver>(&mut self, observer: &mut O) {
830 let _s = tracing::debug_span!("apply_update_events").entered();
831
832 loop {
833 let events: Vec<_> = self.pending.events.drain(..).collect();
834 if events.is_empty() {
835 break;
836 }
837 for mut update in events {
838 let _s = tracing::debug_span!("update_event", ?update).entered();
839
840 self.loop_monitor.maybe_trace(|| {
841 let _t = INSTANT_APP.pause_for_update();
842
843 {
844 let _s = tracing::debug_span!("ext.event_preview").entered();
845 self.extensions.event_preview(&mut update);
846 }
847 {
848 let _s = tracing::debug_span!("obs.event_preview").entered();
849 observer.event_preview(&mut update);
850 }
851 update.call_pre_actions();
852
853 {
854 let _s = tracing::debug_span!("ext.event_ui").entered();
855 self.extensions.event_ui(&mut update);
856 }
857 {
858 let _s = tracing::debug_span!("obs.event_ui").entered();
859 observer.event_ui(&mut update);
860 }
861 {
862 let _s = tracing::debug_span!("ext.event").entered();
863 self.extensions.event(&mut update);
864 }
865 {
866 let _s = tracing::debug_span!("obs.event").entered();
867 observer.event(&mut update);
868 }
869 update.call_pos_actions();
870 });
871
872 self.apply_updates(observer);
873 }
874 }
875 }
876
877 fn view_is_busy(&mut self) -> bool {
878 VIEW_PROCESS.is_available() && VIEW_PROCESS.pending_frames() > 0
879 }
880
881 fn finish_frame<O: AppEventObserver>(&mut self, observer: &mut O) {
883 debug_assert!(!self.view_is_busy());
884
885 self.pending |= UPDATES.apply_layout_render();
886
887 while mem::take(&mut self.pending.layout) {
888 let _s = tracing::debug_span!("apply_layout").entered();
889
890 let mut layout_widgets = mem::take(&mut self.pending.layout_widgets);
891
892 self.loop_monitor.maybe_trace(|| {
893 let _t = INSTANT_APP.pause_for_update();
894
895 {
896 let _s = tracing::debug_span!("ext.layout").entered();
897 self.extensions.layout(&mut layout_widgets);
898 }
899 {
900 let _s = tracing::debug_span!("obs.layout").entered();
901 observer.layout(&mut layout_widgets);
902 }
903 });
904
905 self.apply_updates(observer);
906 self.apply_update_events(observer);
907 self.pending |= UPDATES.apply_layout_render();
908 }
909
910 if mem::take(&mut self.pending.render) {
911 let _s = tracing::debug_span!("apply_render").entered();
912
913 let mut render_widgets = mem::take(&mut self.pending.render_widgets);
914 let mut render_update_widgets = mem::take(&mut self.pending.render_update_widgets);
915
916 let _t = INSTANT_APP.pause_for_update();
917
918 {
919 let _s = tracing::debug_span!("ext.render").entered();
920 self.extensions.render(&mut render_widgets, &mut render_update_widgets);
921 }
922 {
923 let _s = tracing::debug_span!("obs.render").entered();
924 observer.render(&mut render_widgets, &mut render_update_widgets);
925 }
926 }
927
928 self.loop_monitor.finish_frame();
929 }
930}
931impl<E: AppExtension> Drop for RunningApp<E> {
932 fn drop(&mut self) {
933 let _s = tracing::debug_span!("ext.deinit").entered();
934 self.extensions.deinit();
935 VIEW_PROCESS.exit();
936 }
937}
938
939pub struct AppStartArgs {
944 _private: (),
945}
946
947pub fn on_app_start(handler: impl FnMut(&AppStartArgs) + Send + 'static) {
953 zng_unique_id::hot_static_ref!(ON_APP_START).lock().push(Box::new(handler))
954}
955zng_unique_id::hot_static! {
956 static ON_APP_START: Mutex<Vec<AppStartHandler>> = Mutex::new(vec![]);
957}
958type AppStartHandler = Box<dyn FnMut(&AppStartArgs) + Send + 'static>;
959
960#[derive(Debug)]
962pub(crate) struct LoopTimer {
963 now: DInstant,
964 deadline: Option<Deadline>,
965}
966impl Default for LoopTimer {
967 fn default() -> Self {
968 Self {
969 now: INSTANT.now(),
970 deadline: None,
971 }
972 }
973}
974impl LoopTimer {
975 pub fn elapsed(&mut self, deadline: Deadline) -> bool {
978 if deadline.0 <= self.now {
979 true
980 } else {
981 self.register(deadline);
982 false
983 }
984 }
985
986 pub fn register(&mut self, deadline: Deadline) {
988 if let Some(d) = &mut self.deadline {
989 if deadline < *d {
990 *d = deadline;
991 }
992 } else {
993 self.deadline = Some(deadline)
994 }
995 }
996
997 pub(crate) fn poll(&mut self) -> Option<Deadline> {
999 self.deadline
1000 }
1001
1002 pub(crate) fn awake(&mut self) -> bool {
1004 self.now = INSTANT.now();
1005 if let Some(d) = self.deadline
1006 && d.0 <= self.now
1007 {
1008 self.deadline = None;
1009 return true;
1010 }
1011 false
1012 }
1013
1014 pub fn now(&self) -> DInstant {
1016 self.now
1017 }
1018}
1019impl zng_var::animation::AnimationTimer for LoopTimer {
1020 fn elapsed(&mut self, deadline: Deadline) -> bool {
1021 self.elapsed(deadline)
1022 }
1023
1024 fn register(&mut self, deadline: Deadline) {
1025 self.register(deadline)
1026 }
1027
1028 fn now(&self) -> DInstant {
1029 self.now()
1030 }
1031}
1032
1033#[derive(Default)]
1034struct LoopMonitor {
1035 update_count: u16,
1036 skipped: bool,
1037 trace: Vec<UpdateTrace>,
1038}
1039impl LoopMonitor {
1040 pub fn update(&mut self, update_once: impl FnOnce() -> bool) -> bool {
1042 self.update_count += 1;
1043
1044 if self.update_count < 500 {
1045 update_once()
1046 } else if self.update_count < 1000 {
1047 UpdatesTrace::collect_trace(&mut self.trace, update_once)
1048 } else if self.update_count == 1000 {
1049 self.skipped = true;
1050 let trace = UpdatesTrace::format_trace(mem::take(&mut self.trace));
1051 tracing::error!(
1052 "updated 1000 times without rendering, probably stuck in an infinite loop\n\
1053 will start skipping updates to render and poll system events\n\
1054 top 20 most frequent update requests (in 500 cycles):\n\
1055 {trace}\n\
1056 you can use `UpdatesTraceUiNodeExt` and `updates_trace_event` to refine the trace"
1057 );
1058 false
1059 } else if self.update_count == 1500 {
1060 self.update_count = 1001;
1061 false
1062 } else {
1063 update_once()
1064 }
1065 }
1066
1067 pub fn maybe_trace(&mut self, notify_once: impl FnOnce()) {
1068 if (500..1000).contains(&self.update_count) {
1069 UpdatesTrace::collect_trace(&mut self.trace, notify_once);
1070 } else {
1071 notify_once();
1072 }
1073 }
1074
1075 pub fn finish_frame(&mut self) {
1076 if !self.skipped {
1077 self.skipped = false;
1078 self.update_count = 0;
1079 self.trace = vec![];
1080 }
1081 }
1082}
1083
1084impl APP {
1085 pub fn exit(&self) -> ResponseVar<ExitCancelled> {
1094 APP_PROCESS_SV.write().exit()
1095 }
1096
1097 pub fn is_suspended(&self) -> Var<bool> {
1105 APP_PROCESS_SV.read().is_suspended.read_only()
1106 }
1107}
1108
1109impl APP {
1113 pub fn pause_time_for_update(&self) -> Var<bool> {
1119 APP_PROCESS_SV.read().pause_time_for_updates.clone()
1120 }
1121
1122 pub fn start_manual_time(&self) {
1130 INSTANT_APP.set_mode(InstantMode::Manual);
1131 INSTANT_APP.set_now(INSTANT.now());
1132 UPDATES.update(None);
1133 }
1134
1135 pub fn advance_manual_time(&self, advance: Duration) {
1146 INSTANT_APP.advance_now(advance);
1147 UPDATES.update(None);
1148 }
1149
1150 pub fn set_manual_time(&self, now: DInstant) {
1159 INSTANT_APP.set_now(now);
1160 UPDATES.update(None);
1161 }
1162
1163 pub fn end_manual_time(&self) {
1165 INSTANT_APP.set_mode(match APP.pause_time_for_update().get() {
1166 true => InstantMode::UpdatePaused,
1167 false => InstantMode::Now,
1168 });
1169 UPDATES.update(None);
1170 }
1171}
1172
1173command! {
1174 pub static EXIT_CMD = {
1178 l10n!: true,
1179 name: "Exit",
1180 info: "Close all windows and exit",
1181 shortcut: shortcut!(Exit),
1182 };
1183}
1184
1185#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1189pub struct ExitCancelled;
1190impl fmt::Display for ExitCancelled {
1191 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1192 write!(f, "exit request cancelled")
1193 }
1194}
1195
1196struct AppIntrinsic {
1197 exit_handle: CommandHandle,
1198 pending_exit: Option<PendingExit>,
1199}
1200struct PendingExit {
1201 handle: EventPropagationHandle,
1202 response: ResponderVar<ExitCancelled>,
1203}
1204impl AppIntrinsic {
1205 pub(super) fn pre_init(is_headed: bool, with_renderer: bool, view_process_exe: PathBuf, view_process_env: HashMap<Txt, Txt>) -> Self {
1207 APP_PROCESS_SV
1208 .read()
1209 .pause_time_for_updates
1210 .hook(|a| {
1211 if !matches!(INSTANT.mode(), zng_time::InstantMode::Manual) {
1212 if *a.value() {
1213 INSTANT_APP.set_mode(InstantMode::UpdatePaused);
1214 } else {
1215 INSTANT_APP.set_mode(InstantMode::Now);
1216 }
1217 }
1218 true
1219 })
1220 .perm();
1221
1222 if is_headed {
1223 debug_assert!(with_renderer);
1224
1225 let view_evs_sender = UPDATES.sender();
1226 VIEW_PROCESS.start(view_process_exe, view_process_env, false, move |ev| {
1227 let _ = view_evs_sender.send_view_event(ev);
1228 });
1229 } else if with_renderer {
1230 let view_evs_sender = UPDATES.sender();
1231 VIEW_PROCESS.start(view_process_exe, view_process_env, true, move |ev| {
1232 let _ = view_evs_sender.send_view_event(ev);
1233 });
1234 }
1235
1236 AppIntrinsic {
1237 exit_handle: EXIT_CMD.subscribe(true),
1238 pending_exit: None,
1239 }
1240 }
1241
1242 pub(super) fn exit(&mut self) -> bool {
1244 if let Some(pending) = self.pending_exit.take() {
1245 if pending.handle.is_stopped() {
1246 pending.response.respond(ExitCancelled);
1247 false
1248 } else {
1249 true
1250 }
1251 } else {
1252 false
1253 }
1254 }
1255}
1256impl AppExtension for AppIntrinsic {
1257 fn event_preview(&mut self, update: &mut EventUpdate) {
1258 if VIEW_PROCESS_INITED_EVENT.has(update) {
1259 let filter = APP_PROCESS_SV.read().device_events_filter.get();
1260 if !filter.is_empty()
1261 && let Err(e) = VIEW_PROCESS.set_device_events_filter(filter)
1262 {
1263 tracing::error!("cannot set device events on the view-process, {e}");
1264 }
1265 } else if let Some(args) = EXIT_CMD.on(update) {
1266 args.handle_enabled(&self.exit_handle, |_| {
1267 APP.exit();
1268 });
1269 }
1270 }
1271
1272 fn update(&mut self) {
1273 let mut sv = APP_PROCESS_SV.write();
1274 if let Some(filter) = sv.device_events_filter.get_new()
1275 && let Err(e) = VIEW_PROCESS.set_device_events_filter(filter)
1276 {
1277 tracing::error!("cannot set device events on the view-process, {e}");
1278 }
1279 if let Some(response) = sv.take_requests() {
1280 let args = ExitRequestedArgs::now();
1281 self.pending_exit = Some(PendingExit {
1282 handle: args.propagation().clone(),
1283 response,
1284 });
1285 EXIT_REQUESTED_EVENT.notify(args);
1286 }
1287 }
1288}
1289
1290pub(crate) fn assert_not_view_process() {
1291 if zng_view_api::ViewConfig::from_env().is_some() {
1292 panic!("cannot start App in view-process");
1293 }
1294}
1295#[cfg(feature = "deadlock_detection")]
1300pub fn spawn_deadlock_detection() {
1301 use parking_lot::deadlock;
1302 use std::{
1303 sync::atomic::{self, AtomicBool},
1304 thread,
1305 time::*,
1306 };
1307
1308 static CHECK_RUNNING: AtomicBool = AtomicBool::new(false);
1309
1310 if CHECK_RUNNING.swap(true, atomic::Ordering::SeqCst) {
1311 return;
1312 }
1313
1314 thread::spawn(|| {
1315 loop {
1316 thread::sleep(Duration::from_secs(10));
1317
1318 let deadlocks = deadlock::check_deadlock();
1319 if deadlocks.is_empty() {
1320 continue;
1321 }
1322
1323 use std::fmt::Write;
1324 let mut msg = String::new();
1325
1326 let _ = writeln!(&mut msg, "{} deadlocks detected", deadlocks.len());
1327 for (i, threads) in deadlocks.iter().enumerate() {
1328 let _ = writeln!(&mut msg, "Deadlock #{}, {} threads", i, threads.len());
1329 for t in threads {
1330 let _ = writeln!(&mut msg, "Thread Id {:#?}", t.thread_id());
1331 let _ = writeln!(&mut msg, "{:#?}", t.backtrace());
1332 }
1333 }
1334
1335 #[cfg(not(feature = "test_util"))]
1336 eprint!("{msg}");
1337
1338 #[cfg(feature = "test_util")]
1339 {
1340 use std::io::Write;
1343 let _ = write!(&mut std::io::stderr(), "{msg}");
1344 zng_env::exit(-1);
1345 }
1346 }
1347 });
1348}
1349#[cfg(not(feature = "deadlock_detection"))]
1354pub fn spawn_deadlock_detection() {}
1355
1356app_local! {
1357 pub(super) static APP_PROCESS_SV: AppProcessService = AppProcessService {
1358 exit_requests: None,
1359 extensions: None,
1360 device_events_filter: zng_var::var(Default::default()),
1361 pause_time_for_updates: zng_var::var(true),
1362 is_suspended: zng_var::var(false),
1363 };
1364}
1365
1366pub(super) struct AppProcessService {
1367 exit_requests: Option<ResponderVar<ExitCancelled>>,
1368 extensions: Option<Arc<AppExtensionsInfo>>,
1369 pub(crate) device_events_filter: Var<DeviceEventsFilter>,
1370 pause_time_for_updates: Var<bool>,
1371 is_suspended: Var<bool>,
1372}
1373impl AppProcessService {
1374 pub(super) fn take_requests(&mut self) -> Option<ResponderVar<ExitCancelled>> {
1375 self.exit_requests.take()
1376 }
1377
1378 fn exit(&mut self) -> ResponseVar<ExitCancelled> {
1379 if let Some(r) = &self.exit_requests {
1380 r.response_var()
1381 } else {
1382 let (responder, response) = response_var();
1383 self.exit_requests = Some(responder);
1384 UPDATES.update(None);
1385 response
1386 }
1387 }
1388
1389 pub(super) fn extensions(&self) -> Arc<AppExtensionsInfo> {
1390 self.extensions
1391 .clone()
1392 .unwrap_or_else(|| Arc::new(AppExtensionsInfo { infos: vec![] }))
1393 }
1394
1395 pub(super) fn set_extensions(&mut self, info: AppExtensionsInfo) {
1396 self.extensions = Some(Arc::new(info));
1397 }
1398
1399 pub(super) fn is_running(&self) -> bool {
1400 self.extensions.is_some()
1401 }
1402}
1403
1404#[derive(Debug)]
1406#[allow(clippy::large_enum_variant)] pub(crate) enum AppEvent {
1408 ViewEvent(zng_view_api::Event),
1410 Event(crate::event::EventUpdateMsg),
1412 Update(UpdateOp, Option<WidgetId>),
1414 ResumeUnwind(PanicPayload),
1416 CheckUpdate,
1418}
1419
1420#[derive(Clone)]
1426pub struct AppEventSender(flume::Sender<AppEvent>);
1427impl AppEventSender {
1428 pub(crate) fn new() -> (Self, flume::Receiver<AppEvent>) {
1429 let (sender, receiver) = flume::unbounded();
1430 (Self(sender), receiver)
1431 }
1432
1433 #[allow(clippy::result_large_err)] fn send_app_event(&self, event: AppEvent) -> Result<(), AppChannelError> {
1435 self.0.send(event).map_err(|_| AppChannelError::Disconnected)
1436 }
1437
1438 #[allow(clippy::result_large_err)]
1439 fn send_view_event(&self, event: zng_view_api::Event) -> Result<(), AppChannelError> {
1440 self.0.send(AppEvent::ViewEvent(event)).map_err(|_| AppChannelError::Disconnected)
1441 }
1442
1443 pub fn send_update(&self, op: UpdateOp, target: impl Into<Option<WidgetId>>) -> Result<(), AppChannelError> {
1445 UpdatesTrace::log_update();
1446 self.send_app_event(AppEvent::Update(op, target.into()))
1447 .map_err(|_| AppChannelError::Disconnected)
1448 }
1449
1450 pub(crate) fn send_event(&self, event: crate::event::EventUpdateMsg) -> Result<(), AppChannelError> {
1452 self.send_app_event(AppEvent::Event(event))
1453 .map_err(|_| AppChannelError::Disconnected)
1454 }
1455
1456 pub fn send_resume_unwind(&self, payload: PanicPayload) -> Result<(), AppChannelError> {
1458 self.send_app_event(AppEvent::ResumeUnwind(payload))
1459 .map_err(|_| AppChannelError::Disconnected)
1460 }
1461
1462 pub(crate) fn send_check_update(&self) -> Result<(), AppChannelError> {
1464 self.send_app_event(AppEvent::CheckUpdate)
1465 .map_err(|_| AppChannelError::Disconnected)
1466 }
1467
1468 pub fn waker(&self, target: impl Into<Option<WidgetId>>) -> Waker {
1470 Arc::new(AppWaker(self.0.clone(), target.into())).into()
1471 }
1472
1473 pub fn ext_channel<T>(&self) -> (AppExtSender<T>, AppExtReceiver<T>) {
1475 let (sender, receiver) = flume::unbounded();
1476
1477 (
1478 AppExtSender {
1479 update: self.clone(),
1480 sender,
1481 },
1482 AppExtReceiver { receiver },
1483 )
1484 }
1485
1486 pub fn ext_channel_bounded<T>(&self, cap: usize) -> (AppExtSender<T>, AppExtReceiver<T>) {
1488 let (sender, receiver) = flume::bounded(cap);
1489
1490 (
1491 AppExtSender {
1492 update: self.clone(),
1493 sender,
1494 },
1495 AppExtReceiver { receiver },
1496 )
1497 }
1498}
1499
1500struct AppWaker(flume::Sender<AppEvent>, Option<WidgetId>);
1501impl std::task::Wake for AppWaker {
1502 fn wake(self: std::sync::Arc<Self>) {
1503 self.wake_by_ref()
1504 }
1505 fn wake_by_ref(self: &Arc<Self>) {
1506 let _ = self.0.send(AppEvent::Update(UpdateOp::Update, self.1));
1507 }
1508}
1509
1510type PanicPayload = Box<dyn std::any::Any + Send + 'static>;
1511
1512pub struct AppExtSender<T> {
1516 update: AppEventSender,
1517 sender: flume::Sender<T>,
1518}
1519impl<T> Clone for AppExtSender<T> {
1520 fn clone(&self) -> Self {
1521 Self {
1522 update: self.update.clone(),
1523 sender: self.sender.clone(),
1524 }
1525 }
1526}
1527impl<T: Send> AppExtSender<T> {
1528 pub fn send(&self, msg: T) -> Result<(), AppChannelError> {
1530 match self.update.send_update(UpdateOp::Update, None) {
1531 Ok(()) => self.sender.send(msg).map_err(|_| AppChannelError::Disconnected),
1532 Err(_) => Err(AppChannelError::Disconnected),
1533 }
1534 }
1535
1536 pub fn send_timeout(&self, msg: T, dur: Duration) -> Result<(), AppChannelError> {
1538 match self.update.send_update(UpdateOp::Update, None) {
1539 Ok(()) => self.sender.send_timeout(msg, dur).map_err(|e| match e {
1540 flume::SendTimeoutError::Timeout(_) => AppChannelError::Timeout,
1541 flume::SendTimeoutError::Disconnected(_) => AppChannelError::Disconnected,
1542 }),
1543 Err(_) => Err(AppChannelError::Disconnected),
1544 }
1545 }
1546
1547 pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), AppChannelError> {
1549 match self.update.send_update(UpdateOp::Update, None) {
1550 Ok(()) => self.sender.send_deadline(msg, deadline).map_err(|e| match e {
1551 flume::SendTimeoutError::Timeout(_) => AppChannelError::Timeout,
1552 flume::SendTimeoutError::Disconnected(_) => AppChannelError::Disconnected,
1553 }),
1554 Err(_) => Err(AppChannelError::Disconnected),
1555 }
1556 }
1557}
1558
1559pub struct AppExtReceiver<T> {
1563 receiver: flume::Receiver<T>,
1564}
1565impl<T> Clone for AppExtReceiver<T> {
1566 fn clone(&self) -> Self {
1567 Self {
1568 receiver: self.receiver.clone(),
1569 }
1570 }
1571}
1572impl<T> AppExtReceiver<T> {
1573 pub fn try_recv(&self) -> Result<T, Option<AppChannelError>> {
1578 self.receiver.try_recv().map_err(|e| match e {
1579 flume::TryRecvError::Empty => None,
1580 flume::TryRecvError::Disconnected => Some(AppChannelError::Disconnected),
1581 })
1582 }
1583}
1584
1585#[derive(Debug, Clone)]
1587#[non_exhaustive]
1588pub enum AppChannelError {
1589 Disconnected,
1591 Timeout,
1593}
1594impl fmt::Display for AppChannelError {
1595 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1596 match self {
1597 AppChannelError::Disconnected => write!(f, "cannot receive because the sender disconnected"),
1598 AppChannelError::Timeout => write!(f, "deadline elapsed before message could be send/received"),
1599 }
1600 }
1601}
1602impl std::error::Error for AppChannelError {}
1603impl From<flume::RecvTimeoutError> for AppChannelError {
1604 fn from(value: flume::RecvTimeoutError) -> Self {
1605 match value {
1606 flume::RecvTimeoutError::Timeout => AppChannelError::Timeout,
1607 flume::RecvTimeoutError::Disconnected => AppChannelError::Disconnected,
1608 }
1609 }
1610}
1611
1612event_args! {
1613 pub struct ExitRequestedArgs {
1617
1618 ..
1619
1620 fn delivery_list(&self, list: &mut UpdateDeliveryList) {
1622 list.search_all()
1623 }
1624 }
1625}
1626
1627event! {
1628 pub static EXIT_REQUESTED_EVENT: ExitRequestedArgs;
1636}
1637
1638trait ReceiverExt<T> {
1640 fn recv_deadline_sp(&self, deadline: Deadline) -> Result<T, flume::RecvTimeoutError>;
1642}
1643
1644const WORST_SLEEP_ERR: Duration = Duration::from_millis(if cfg!(windows) { 20 } else { 10 });
1645const WORST_SPIN_ERR: Duration = Duration::from_millis(if cfg!(windows) { 2 } else { 1 });
1646
1647impl<T> ReceiverExt<T> for flume::Receiver<T> {
1648 fn recv_deadline_sp(&self, deadline: Deadline) -> Result<T, flume::RecvTimeoutError> {
1649 loop {
1650 if let Some(d) = deadline.0.checked_duration_since(INSTANT.now()) {
1651 if matches!(INSTANT.mode(), zng_time::InstantMode::Manual) {
1652 match self.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
1655 Err(flume::RecvTimeoutError::Timeout) => continue, interrupt => return interrupt,
1657 }
1658 } else if d > WORST_SLEEP_ERR {
1659 #[cfg(not(target_arch = "wasm32"))]
1661 match self.recv_deadline(deadline.0.checked_sub(WORST_SLEEP_ERR).unwrap().into()) {
1662 Err(flume::RecvTimeoutError::Timeout) => continue, interrupt => return interrupt,
1664 }
1665
1666 #[cfg(target_arch = "wasm32")] match self.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
1668 Err(flume::RecvTimeoutError::Timeout) => continue, interrupt => return interrupt,
1670 }
1671 } else if d > WORST_SPIN_ERR {
1672 let spin_deadline = Deadline(deadline.0.checked_sub(WORST_SPIN_ERR).unwrap());
1673
1674 while !spin_deadline.has_elapsed() {
1676 match self.try_recv() {
1677 Err(flume::TryRecvError::Empty) => std::thread::yield_now(),
1678 Err(flume::TryRecvError::Disconnected) => return Err(flume::RecvTimeoutError::Disconnected),
1679 Ok(msg) => return Ok(msg),
1680 }
1681 }
1682 continue; } else {
1684 while !deadline.has_elapsed() {
1686 std::thread::yield_now();
1687 }
1688 return Err(flume::RecvTimeoutError::Timeout);
1689 }
1690 } else {
1691 return Err(flume::RecvTimeoutError::Timeout);
1692 }
1693 }
1694 }
1695}