1use std::{
2 collections::HashMap,
3 fmt, mem,
4 path::PathBuf,
5 sync::Arc,
6 task::Waker,
7 time::{Duration, Instant},
8};
9
10use crate::Deadline;
11use parking_lot::Mutex;
12use zng_app_context::{AppScope, app_local};
13use zng_task::DEADLINE_APP;
14use zng_time::{INSTANT_APP, InstantMode};
15use zng_txt::Txt;
16use zng_var::{ResponderVar, ResponseVar, VARS_APP, Var, response_var};
17use zng_view_api::{DeviceEventsFilter, raw_input::InputDeviceEvent};
18
19use crate::{
20 APP, AppControlFlow, AppEventObserver, AppExtension, AppExtensionsInfo, DInstant, INSTANT,
21 event::{AnyEventArgs, CommandHandle, CommandInfoExt, CommandNameExt, EVENTS, EventPropagationHandle, command, event},
22 event_args,
23 shortcut::CommandShortcutExt,
24 shortcut::shortcut,
25 timer::TimersService,
26 update::{
27 ContextUpdates, EventUpdate, InfoUpdates, LayoutUpdates, RenderUpdates, UPDATES, UpdateOp, UpdateTrace, UpdatesTrace, WidgetUpdates,
28 },
29 view_process::{raw_device_events::InputDeviceId, *},
30 widget::WidgetId,
31 window::WindowId,
32};
33
34pub(crate) struct RunningApp<E: AppExtension> {
36 extensions: (AppIntrinsic, E),
37
38 receiver: flume::Receiver<AppEvent>,
39
40 loop_timer: LoopTimer,
41 loop_monitor: LoopMonitor,
42 last_wait_event: Instant,
43
44 pending_view_events: Vec<zng_view_api::Event>,
45 pending_view_frame_events: Vec<zng_view_api::window::EventFrameRendered>,
46 pending: ContextUpdates,
47
48 exited: bool,
49
50 _scope: AppScope,
52}
53impl<E: AppExtension> RunningApp<E> {
54 pub(crate) fn start(
55 scope: AppScope,
56 mut extensions: E,
57 is_headed: bool,
58 with_renderer: bool,
59 view_process_exe: Option<PathBuf>,
60 view_process_env: HashMap<Txt, Txt>,
61 ) -> Self {
62 let _s = tracing::debug_span!("APP::start").entered();
63
64 let (sender, receiver) = AppEventSender::new();
65
66 UPDATES.init(sender);
67
68 fn app_waker() {
69 UPDATES.update(None);
70 }
71 VARS_APP.init_app_waker(app_waker);
72 VARS_APP.init_modify_trace(UpdatesTrace::log_var);
73 DEADLINE_APP.init_deadline_service(crate::timer::deadline_service);
74 zng_var::animation::TRANSITIONABLE_APP.init_rgba_lerp(zng_color::lerp_rgba);
75
76 let mut info = AppExtensionsInfo::start();
77 {
78 let _t = INSTANT_APP.pause_for_update();
79 extensions.register(&mut info);
80 }
81
82 {
83 let mut sv = APP_PROCESS_SV.write();
84 sv.set_extensions(info);
85 }
86
87 if with_renderer && view_process_exe.is_none() {
88 zng_env::assert_inited();
89 }
90
91 #[cfg(not(target_arch = "wasm32"))]
92 let view_process_exe = view_process_exe.unwrap_or_else(|| std::env::current_exe().expect("current_exe"));
93 #[cfg(target_arch = "wasm32")]
94 let view_process_exe = std::path::PathBuf::from("<wasm>");
95
96 let process = AppIntrinsic::pre_init(is_headed, with_renderer, view_process_exe, view_process_env);
97
98 {
99 let _s = tracing::debug_span!("extensions.init").entered();
100 extensions.init();
101 }
102
103 let args = AppStartArgs { _private: () };
104 for h in zng_unique_id::hot_static_ref!(ON_APP_START).lock().iter_mut() {
105 h(&args)
106 }
107
108 RunningApp {
109 extensions: (process, extensions),
110
111 receiver,
112
113 loop_timer: LoopTimer::default(),
114 loop_monitor: LoopMonitor::default(),
115 last_wait_event: Instant::now(),
116
117 pending_view_events: Vec::with_capacity(100),
118 pending_view_frame_events: Vec::with_capacity(5),
119 pending: ContextUpdates {
120 events: Vec::with_capacity(100),
121 update: false,
122 info: false,
123 layout: false,
124 render: false,
125 update_widgets: WidgetUpdates::default(),
126 info_widgets: InfoUpdates::default(),
127 layout_widgets: LayoutUpdates::default(),
128 render_widgets: RenderUpdates::default(),
129 render_update_widgets: RenderUpdates::default(),
130 },
131 exited: false,
132
133 _scope: scope,
134 }
135 }
136
137 pub fn has_exited(&self) -> bool {
138 self.exited
139 }
140
141 pub fn notify_event<O: AppEventObserver>(&mut self, mut update: EventUpdate, observer: &mut O) {
143 let _scope = tracing::trace_span!("notify_event", event = update.event().name()).entered();
144
145 let _t = INSTANT_APP.pause_for_update();
146
147 update.event().on_update(&mut update);
148
149 self.extensions.event_preview(&mut update);
150 observer.event_preview(&mut update);
151 update.call_pre_actions();
152
153 self.extensions.event_ui(&mut update);
154 observer.event_ui(&mut update);
155
156 self.extensions.event(&mut update);
157 observer.event(&mut update);
158 update.call_pos_actions();
159 }
160
161 fn input_device_id(&mut self, id: zng_view_api::raw_input::InputDeviceId) -> InputDeviceId {
162 VIEW_PROCESS.input_device_id(id)
163 }
164
165 fn on_view_event<O: AppEventObserver>(&mut self, ev: zng_view_api::Event, observer: &mut O) {
167 use crate::view_process::raw_device_events::*;
168 use crate::view_process::raw_events::*;
169 use zng_view_api::Event;
170
171 fn window_id(id: zng_view_api::window::WindowId) -> WindowId {
172 WindowId::from_raw(id.get())
173 }
174
175 match ev {
176 Event::MouseMoved {
177 window: w_id,
178 device: d_id,
179 coalesced_pos,
180 position,
181 } => {
182 let args = RawMouseMovedArgs::now(window_id(w_id), self.input_device_id(d_id), coalesced_pos, position);
183 self.notify_event(RAW_MOUSE_MOVED_EVENT.new_update(args), observer);
184 }
185 Event::MouseEntered {
186 window: w_id,
187 device: d_id,
188 } => {
189 let args = RawMouseArgs::now(window_id(w_id), self.input_device_id(d_id));
190 self.notify_event(RAW_MOUSE_ENTERED_EVENT.new_update(args), observer);
191 }
192 Event::MouseLeft {
193 window: w_id,
194 device: d_id,
195 } => {
196 let args = RawMouseArgs::now(window_id(w_id), self.input_device_id(d_id));
197 self.notify_event(RAW_MOUSE_LEFT_EVENT.new_update(args), observer);
198 }
199 Event::WindowChanged(c) => {
200 let monitor_id = c.monitor.map(|id| VIEW_PROCESS.monitor_id(id));
201 let args = RawWindowChangedArgs::now(
202 window_id(c.window),
203 c.state,
204 c.position,
205 monitor_id,
206 c.size,
207 c.safe_padding,
208 c.cause,
209 c.frame_wait_id,
210 );
211 self.notify_event(RAW_WINDOW_CHANGED_EVENT.new_update(args), observer);
212 }
213 Event::DragHovered { window, data, allowed } => {
214 let args = RawDragHoveredArgs::now(window_id(window), data, allowed);
215 self.notify_event(RAW_DRAG_HOVERED_EVENT.new_update(args), observer);
216 }
217 Event::DragMoved {
218 window,
219 coalesced_pos,
220 position,
221 } => {
222 let args = RawDragMovedArgs::now(window_id(window), coalesced_pos, position);
223 self.notify_event(RAW_DRAG_MOVED_EVENT.new_update(args), observer);
224 }
225 Event::DragDropped {
226 window,
227 data,
228 allowed,
229 drop_id,
230 } => {
231 let args = RawDragDroppedArgs::now(window_id(window), data, allowed, drop_id);
232 self.notify_event(RAW_DRAG_DROPPED_EVENT.new_update(args), observer);
233 }
234 Event::DragCancelled { window } => {
235 let args = RawDragCancelledArgs::now(window_id(window));
236 self.notify_event(RAW_DRAG_CANCELLED_EVENT.new_update(args), observer);
237 }
238 Event::AppDragEnded { window, drag, applied } => {
239 let args = RawAppDragEndedArgs::now(window_id(window), drag, applied);
240 self.notify_event(RAW_APP_DRAG_ENDED_EVENT.new_update(args), observer);
241 }
242 Event::FocusChanged { prev, new } => {
243 let args = RawWindowFocusArgs::now(prev.map(window_id), new.map(window_id));
244 self.notify_event(RAW_WINDOW_FOCUS_EVENT.new_update(args), observer);
245 }
246 Event::KeyboardInput {
247 window: w_id,
248 device: d_id,
249 key_code,
250 state,
251 key,
252 key_location,
253 key_modified,
254 text,
255 } => {
256 let args = RawKeyInputArgs::now(
257 window_id(w_id),
258 self.input_device_id(d_id),
259 key_code,
260 key_location,
261 state,
262 key,
263 key_modified,
264 text,
265 );
266 self.notify_event(RAW_KEY_INPUT_EVENT.new_update(args), observer);
267 }
268 Event::Ime { window: w_id, ime } => {
269 let args = RawImeArgs::now(window_id(w_id), ime);
270 self.notify_event(RAW_IME_EVENT.new_update(args), observer);
271 }
272
273 Event::MouseWheel {
274 window: w_id,
275 device: d_id,
276 delta,
277 phase,
278 } => {
279 let args = RawMouseWheelArgs::now(window_id(w_id), self.input_device_id(d_id), delta, phase);
280 self.notify_event(RAW_MOUSE_WHEEL_EVENT.new_update(args), observer);
281 }
282 Event::MouseInput {
283 window: w_id,
284 device: d_id,
285 state,
286 button,
287 } => {
288 let args = RawMouseInputArgs::now(window_id(w_id), self.input_device_id(d_id), state, button);
289 self.notify_event(RAW_MOUSE_INPUT_EVENT.new_update(args), observer);
290 }
291 Event::TouchpadPressure {
292 window: w_id,
293 device: d_id,
294 pressure,
295 stage,
296 } => {
297 let args = RawTouchpadPressureArgs::now(window_id(w_id), self.input_device_id(d_id), pressure, stage);
298 self.notify_event(RAW_TOUCHPAD_PRESSURE_EVENT.new_update(args), observer);
299 }
300 Event::AxisMotion {
301 window: w_id,
302 device: d_id,
303 axis,
304 value,
305 } => {
306 let args = RawAxisMotionArgs::now(window_id(w_id), self.input_device_id(d_id), axis, value);
307 self.notify_event(RAW_AXIS_MOTION_EVENT.new_update(args), observer);
308 }
309 Event::Touch {
310 window: w_id,
311 device: d_id,
312 touches,
313 } => {
314 let args = RawTouchArgs::now(window_id(w_id), self.input_device_id(d_id), touches);
315 self.notify_event(RAW_TOUCH_EVENT.new_update(args), observer);
316 }
317 Event::ScaleFactorChanged {
318 monitor: id,
319 windows,
320 scale_factor,
321 } => {
322 let monitor_id = VIEW_PROCESS.monitor_id(id);
323 let windows: Vec<_> = windows.into_iter().map(window_id).collect();
324 let args = RawScaleFactorChangedArgs::now(monitor_id, windows, scale_factor);
325 self.notify_event(RAW_SCALE_FACTOR_CHANGED_EVENT.new_update(args), observer);
326 }
327 Event::MonitorsChanged(monitors) => {
328 let monitors: Vec<_> = monitors.into_iter().map(|(id, info)| (VIEW_PROCESS.monitor_id(id), info)).collect();
329 let args = RawMonitorsChangedArgs::now(monitors);
330 self.notify_event(RAW_MONITORS_CHANGED_EVENT.new_update(args), observer);
331 }
332 Event::AudioDevicesChanged(_audio_devices) => {
333 }
335 Event::WindowCloseRequested(w_id) => {
336 let args = RawWindowCloseRequestedArgs::now(window_id(w_id));
337 self.notify_event(RAW_WINDOW_CLOSE_REQUESTED_EVENT.new_update(args), observer);
338 }
339 Event::WindowOpened(w_id, data) => {
340 let w_id = window_id(w_id);
341 let (window, data) = VIEW_PROCESS.on_window_opened(w_id, data);
342 let args = RawWindowOpenArgs::now(w_id, window, data);
343 self.notify_event(RAW_WINDOW_OPEN_EVENT.new_update(args), observer);
344 }
345 Event::HeadlessOpened(w_id, data) => {
346 let w_id = window_id(w_id);
347 let (surface, data) = VIEW_PROCESS.on_headless_opened(w_id, data);
348 let args = RawHeadlessOpenArgs::now(w_id, surface, data);
349 self.notify_event(RAW_HEADLESS_OPEN_EVENT.new_update(args), observer);
350 }
351 Event::WindowOrHeadlessOpenError { id: w_id, error } => {
352 let w_id = window_id(w_id);
353 let args = RawWindowOrHeadlessOpenErrorArgs::now(w_id, error);
354 self.notify_event(RAW_WINDOW_OR_HEADLESS_OPEN_ERROR_EVENT.new_update(args), observer);
355 }
356 Event::WindowClosed(w_id) => {
357 let args = RawWindowCloseArgs::now(window_id(w_id));
358 self.notify_event(RAW_WINDOW_CLOSE_EVENT.new_update(args), observer);
359 }
360 Event::ImageMetadataLoaded {
361 image: id,
362 size,
363 density,
364 is_mask,
365 } => {
366 if let Some(img) = VIEW_PROCESS.on_image_metadata_loaded(id, size, density, is_mask) {
367 let args = RawImageArgs::now(img);
368 self.notify_event(RAW_IMAGE_METADATA_LOADED_EVENT.new_update(args), observer);
369 }
370 }
371 Event::ImagePartiallyLoaded {
372 image: id,
373 partial_size,
374 density,
375 is_opaque,
376 is_mask,
377 partial_pixels: partial_bgra8,
378 } => {
379 if let Some(img) = VIEW_PROCESS.on_image_partially_loaded(id, partial_size, density, is_opaque, is_mask, partial_bgra8) {
380 let args = RawImageArgs::now(img);
381 self.notify_event(RAW_IMAGE_PARTIALLY_LOADED_EVENT.new_update(args), observer);
382 }
383 }
384 Event::ImageLoaded(image) => {
385 if let Some(img) = VIEW_PROCESS.on_image_loaded(image) {
386 let args = RawImageArgs::now(img);
387 self.notify_event(RAW_IMAGE_LOADED_EVENT.new_update(args), observer);
388 }
389 }
390 Event::ImageLoadError { image: id, error } => {
391 if let Some(img) = VIEW_PROCESS.on_image_error(id, error) {
392 let args = RawImageArgs::now(img);
393 self.notify_event(RAW_IMAGE_LOAD_ERROR_EVENT.new_update(args), observer);
394 }
395 }
396 Event::ImageEncoded { image: id, format, data } => VIEW_PROCESS.on_image_encoded(id, format, data),
397 Event::ImageEncodeError { image: id, format, error } => {
398 VIEW_PROCESS.on_image_encode_error(id, format, error);
399 }
400 Event::FrameImageReady {
401 window: w_id,
402 frame: frame_id,
403 image: image_id,
404 selection,
405 } => {
406 if let Some(img) = VIEW_PROCESS.on_frame_image_ready(image_id) {
407 let args = RawFrameImageReadyArgs::now(img, window_id(w_id), frame_id, selection);
408 self.notify_event(RAW_FRAME_IMAGE_READY_EVENT.new_update(args), observer);
409 }
410 }
411
412 Event::AccessInit { window: w_id } => {
413 self.notify_event(crate::access::on_access_init(window_id(w_id)), observer);
414 }
415 Event::AccessCommand {
416 window: win_id,
417 target: wgt_id,
418 command,
419 } => {
420 if let Some(update) = crate::access::on_access_command(window_id(win_id), WidgetId::from_raw(wgt_id.0), command) {
421 self.notify_event(update, observer);
422 }
423 }
424 Event::AccessDeinit { window: w_id } => {
425 self.notify_event(crate::access::on_access_deinit(window_id(w_id)), observer);
426 }
427
428 Event::MsgDialogResponse(id, response) => {
430 VIEW_PROCESS.on_message_dlg_response(id, response);
431 }
432 Event::FileDialogResponse(id, response) => {
433 VIEW_PROCESS.on_file_dlg_response(id, response);
434 }
435
436 Event::ExtensionEvent(id, payload) => {
438 let args = RawExtensionEventArgs::now(id, payload);
439 self.notify_event(RAW_EXTENSION_EVENT.new_update(args), observer);
440 }
441
442 Event::FontsChanged => {
444 let args = RawFontChangedArgs::now();
445 self.notify_event(RAW_FONT_CHANGED_EVENT.new_update(args), observer);
446 }
447 Event::FontAaChanged(aa) => {
448 let args = RawFontAaChangedArgs::now(aa);
449 self.notify_event(RAW_FONT_AA_CHANGED_EVENT.new_update(args), observer);
450 }
451 Event::MultiClickConfigChanged(cfg) => {
452 let args = RawMultiClickConfigChangedArgs::now(cfg);
453 self.notify_event(RAW_MULTI_CLICK_CONFIG_CHANGED_EVENT.new_update(args), observer);
454 }
455 Event::AnimationsConfigChanged(cfg) => {
456 VARS_APP.set_sys_animations_enabled(cfg.enabled);
457 let args = RawAnimationsConfigChangedArgs::now(cfg);
458 self.notify_event(RAW_ANIMATIONS_CONFIG_CHANGED_EVENT.new_update(args), observer);
459 }
460 Event::KeyRepeatConfigChanged(cfg) => {
461 let args = RawKeyRepeatConfigChangedArgs::now(cfg);
462 self.notify_event(RAW_KEY_REPEAT_CONFIG_CHANGED_EVENT.new_update(args), observer);
463 }
464 Event::TouchConfigChanged(cfg) => {
465 let args = RawTouchConfigChangedArgs::now(cfg);
466 self.notify_event(RAW_TOUCH_CONFIG_CHANGED_EVENT.new_update(args), observer);
467 }
468 Event::LocaleChanged(cfg) => {
469 let args = RawLocaleChangedArgs::now(cfg);
470 self.notify_event(RAW_LOCALE_CONFIG_CHANGED_EVENT.new_update(args), observer);
471 }
472 Event::ColorsConfigChanged(cfg) => {
473 let args = RawColorsConfigChangedArgs::now(cfg);
474 self.notify_event(RAW_COLORS_CONFIG_CHANGED_EVENT.new_update(args), observer);
475 }
476 Event::ChromeConfigChanged(cfg) => {
477 let args = RawChromeConfigChangedArgs::now(cfg);
478 self.notify_event(RAW_CHROME_CONFIG_CHANGED_EVENT.new_update(args), observer);
479 }
480
481 Event::InputDevicesChanged(devices) => {
483 let devices: HashMap<_, _> = devices.into_iter().map(|(d_id, info)| (self.input_device_id(d_id), info)).collect();
484 INPUT_DEVICES.update(devices.clone());
485 let args = InputDevicesChangedArgs::now(devices);
486 self.notify_event(INPUT_DEVICES_CHANGED_EVENT.new_update(args), observer);
487 }
488 Event::InputDeviceEvent { device, event } => {
489 let d_id = self.input_device_id(device);
490 match event {
491 InputDeviceEvent::PointerMotion { delta } => {
492 let args = PointerMotionArgs::now(d_id, delta);
493 self.notify_event(POINTER_MOTION_EVENT.new_update(args), observer);
494 }
495 InputDeviceEvent::ScrollMotion { delta } => {
496 let args = ScrollMotionArgs::now(d_id, delta);
497 self.notify_event(SCROLL_MOTION_EVENT.new_update(args), observer);
498 }
499 InputDeviceEvent::AxisMotion { axis, value } => {
500 let args = AxisMotionArgs::now(d_id, axis, value);
501 self.notify_event(AXIS_MOTION_EVENT.new_update(args), observer);
502 }
503 InputDeviceEvent::Button { button, state } => {
504 let args = ButtonArgs::now(d_id, button, state);
505 self.notify_event(BUTTON_EVENT.new_update(args), observer);
506 }
507 InputDeviceEvent::Key { key_code, state } => {
508 let args = KeyArgs::now(d_id, key_code, state);
509 self.notify_event(KEY_EVENT.new_update(args), observer);
510 }
511 _ => {}
512 }
513 }
514
515 Event::LowMemory => {
516 LOW_MEMORY_EVENT.notify(LowMemoryArgs::now());
517 }
518
519 Event::RecoveredFromComponentPanic { component, recover, panic } => {
520 tracing::error!(
521 "view-process recovered from internal component panic\n component: {component}\n recover: {recover}\n```panic\n{panic}\n```"
522 );
523 }
524
525 Event::Inited(zng_view_api::Inited { .. }) | Event::Suspended | Event::Disconnected(_) | Event::FrameRendered(_) => {
527 unreachable!()
528 } _ => {}
531 }
532 }
533
534 fn on_view_rendered_event<O: AppEventObserver>(&mut self, ev: zng_view_api::window::EventFrameRendered, observer: &mut O) {
536 debug_assert!(ev.window != zng_view_api::window::WindowId::INVALID);
537 let window_id = WindowId::from_raw(ev.window.get());
538 let image = ev.frame_image.map(|img| VIEW_PROCESS.on_frame_image(img));
540 let args = crate::view_process::raw_events::RawFrameRenderedArgs::now(window_id, ev.frame, image);
541 self.notify_event(crate::view_process::raw_events::RAW_FRAME_RENDERED_EVENT.new_update(args), observer);
542 }
543
544 pub(crate) fn run_headed(mut self) {
545 let mut observer = ();
546 #[cfg(feature = "dyn_app_extension")]
547 let mut observer = observer.as_dyn();
548
549 self.apply_updates(&mut observer);
550 self.apply_update_events(&mut observer);
551 let mut wait = false;
552 loop {
553 wait = match self.poll_impl(wait, &mut observer) {
554 AppControlFlow::Poll => false,
555 AppControlFlow::Wait => true,
556 AppControlFlow::Exit => break,
557 };
558 }
559 }
560
561 fn push_coalesce<O: AppEventObserver>(&mut self, ev: AppEvent, observer: &mut O) {
562 match ev {
563 AppEvent::ViewEvent(ev) => match ev {
564 zng_view_api::Event::FrameRendered(ev) => {
565 if ev.window == zng_view_api::window::WindowId::INVALID {
566 tracing::error!("ignored rendered event for invalid window id, {ev:?}");
567 return;
568 }
569
570 let window = WindowId::from_raw(ev.window.get());
571
572 {
574 if VIEW_PROCESS.is_available() {
575 VIEW_PROCESS.on_frame_rendered(window);
576 }
577 }
578
579 #[cfg(debug_assertions)]
580 if self.pending_view_frame_events.iter().any(|e| e.window == ev.window) {
581 tracing::warn!("window `{window:?}` probably sent a frame request without awaiting renderer idle");
582 }
583
584 self.pending_view_frame_events.push(ev);
585 }
586 zng_view_api::Event::Pong(count) => VIEW_PROCESS.on_pong(count),
587 zng_view_api::Event::Inited(zng_view_api::Inited {
588 generation,
589 is_respawn,
590 extensions,
591 ..
592 }) => {
593 if is_respawn {
595 VIEW_PROCESS.on_respawned(generation);
596 APP_PROCESS_SV.read().is_suspended.set(false);
597 }
598
599 VIEW_PROCESS.handle_inited(generation, extensions.clone());
600
601 let args = crate::view_process::ViewProcessInitedArgs::now(generation, is_respawn, extensions);
602 self.notify_event(VIEW_PROCESS_INITED_EVENT.new_update(args), observer);
603 }
604 zng_view_api::Event::Suspended => {
605 VIEW_PROCESS.handle_suspended();
606 let args = crate::view_process::ViewProcessSuspendedArgs::now();
607 self.notify_event(VIEW_PROCESS_SUSPENDED_EVENT.new_update(args), observer);
608 APP_PROCESS_SV.read().is_suspended.set(true);
609 }
610 zng_view_api::Event::Disconnected(vp_gen) => {
611 VIEW_PROCESS.handle_disconnect(vp_gen);
613 }
614 ev => {
615 if let Some(last) = self.pending_view_events.last_mut() {
616 match last.coalesce(ev) {
617 Ok(()) => {}
618 Err(ev) => self.pending_view_events.push(ev),
619 }
620 } else {
621 self.pending_view_events.push(ev);
622 }
623 }
624 },
625 AppEvent::Event(ev) => EVENTS.notify(ev.get()),
626 AppEvent::Update(op, target) => {
627 UPDATES.update_op(op, target);
628 }
629 AppEvent::CheckUpdate => {}
630 AppEvent::ResumeUnwind(p) => std::panic::resume_unwind(p),
631 }
632 }
633
634 fn has_pending_updates(&mut self) -> bool {
635 !self.pending_view_events.is_empty() || self.pending.has_updates() || UPDATES.has_pending_updates() || !self.receiver.is_empty()
636 }
637
638 pub(crate) fn poll<O: AppEventObserver>(&mut self, wait_app_event: bool, observer: &mut O) -> AppControlFlow {
639 #[cfg(feature = "dyn_app_extension")]
640 let mut observer = observer.as_dyn();
641 #[cfg(feature = "dyn_app_extension")]
642 let observer = &mut observer;
643 self.poll_impl(wait_app_event, observer)
644 }
645 fn poll_impl<O: AppEventObserver>(&mut self, wait_app_event: bool, observer: &mut O) -> AppControlFlow {
646 let mut disconnected = false;
647
648 if self.exited {
649 return AppControlFlow::Exit;
650 }
651
652 if wait_app_event {
653 let idle = tracing::debug_span!("<idle>", ended_by = tracing::field::Empty).entered();
654
655 const PING_TIMER: Duration = Duration::from_secs(2);
656
657 let ping_timer = Deadline::timeout(PING_TIMER);
658 let timer = if self.view_is_busy() {
659 None
660 } else {
661 self.loop_timer.poll().map(|t| t.min(ping_timer))
662 };
663 match self.receiver.recv_deadline_sp(timer.unwrap_or(ping_timer)) {
664 Ok(ev) => {
665 idle.record("ended_by", "event");
666 drop(idle);
667 self.last_wait_event = Instant::now();
668 self.push_coalesce(ev, observer)
669 }
670 Err(e) => match e {
671 flume::RecvTimeoutError::Timeout => {
672 if timer.is_none() {
673 idle.record("ended_by", "timeout (ping)");
674 } else {
675 idle.record("ended_by", "timeout");
676 }
677 if self.last_wait_event.elapsed() >= PING_TIMER && !VIEW_PROCESS.is_same_process() && VIEW_PROCESS.is_connected() {
678 VIEW_PROCESS.ping();
679 }
680 }
681 flume::RecvTimeoutError::Disconnected => {
682 idle.record("ended_by", "disconnected");
683 disconnected = true
684 }
685 },
686 }
687 }
688 loop {
689 match self.receiver.try_recv() {
690 Ok(ev) => self.push_coalesce(ev, observer),
691 Err(e) => match e {
692 flume::TryRecvError::Empty => break,
693 flume::TryRecvError::Disconnected => {
694 disconnected = true;
695 break;
696 }
697 },
698 }
699 }
700 if disconnected {
701 panic!("app events channel disconnected");
702 }
703
704 if self.view_is_busy() {
705 return AppControlFlow::Wait;
706 }
707
708 UPDATES.on_app_awake();
709
710 let updated_timers = self.loop_timer.awake();
712 if updated_timers {
713 UPDATES.update_timers(&mut self.loop_timer);
715 self.apply_updates(observer);
716 }
717
718 let mut events = mem::take(&mut self.pending_view_events);
719 for ev in events.drain(..) {
720 self.on_view_event(ev, observer);
721 self.apply_updates(observer);
722 }
723 debug_assert!(self.pending_view_events.is_empty());
724 self.pending_view_events = events; let mut events = mem::take(&mut self.pending_view_frame_events);
727 for ev in events.drain(..) {
728 self.on_view_rendered_event(ev, observer);
729 }
730 self.pending_view_frame_events = events;
731
732 if self.has_pending_updates() {
733 self.apply_updates(observer);
734 self.apply_update_events(observer);
735 }
736
737 if self.view_is_busy() {
738 return AppControlFlow::Wait;
739 }
740
741 self.finish_frame(observer);
742
743 UPDATES.next_deadline(&mut self.loop_timer);
744
745 if self.extensions.0.exit() {
746 UPDATES.on_app_sleep();
747 self.exited = true;
748 AppControlFlow::Exit
749 } else if self.has_pending_updates() || UPDATES.has_pending_layout_or_render() {
750 AppControlFlow::Poll
751 } else {
752 UPDATES.on_app_sleep();
753 AppControlFlow::Wait
754 }
755 }
756
757 fn apply_updates<O: AppEventObserver>(&mut self, observer: &mut O) {
759 let _s = tracing::debug_span!("apply_updates").entered();
760
761 let mut run = true;
762 while run {
763 run = self.loop_monitor.update(|| {
764 let mut any = false;
765
766 self.pending |= UPDATES.apply_info();
767 if mem::take(&mut self.pending.info) {
768 any = true;
769 let _s = tracing::debug_span!("info").entered();
770
771 let mut info_widgets = mem::take(&mut self.pending.info_widgets);
772
773 let _t = INSTANT_APP.pause_for_update();
774
775 {
776 let _s = tracing::debug_span!("ext.info").entered();
777 self.extensions.info(&mut info_widgets);
778 }
779 {
780 let _s = tracing::debug_span!("obs.info").entered();
781 observer.info(&mut info_widgets);
782 }
783 }
784
785 self.pending |= UPDATES.apply_updates();
786 TimersService::notify();
787 if mem::take(&mut self.pending.update) {
788 any = true;
789 let _s = tracing::debug_span!("update").entered();
790
791 let mut update_widgets = mem::take(&mut self.pending.update_widgets);
792
793 let _t = INSTANT_APP.pause_for_update();
794
795 {
796 let _s = tracing::debug_span!("ext.update_preview").entered();
797 self.extensions.update_preview();
798 }
799 {
800 let _s = tracing::debug_span!("obs.update_preview").entered();
801 observer.update_preview();
802 }
803 UPDATES.on_pre_updates();
804
805 {
806 let _s = tracing::debug_span!("ext.update_ui").entered();
807 self.extensions.update_ui(&mut update_widgets);
808 }
809 {
810 let _s = tracing::debug_span!("obs.update_ui").entered();
811 observer.update_ui(&mut update_widgets);
812 }
813
814 {
815 let _s = tracing::debug_span!("ext.update").entered();
816 self.extensions.update();
817 }
818 {
819 let _s = tracing::debug_span!("obs.update").entered();
820 observer.update();
821 }
822 UPDATES.on_updates();
823 }
824
825 any
826 });
827 }
828 }
829
830 fn apply_update_events<O: AppEventObserver>(&mut self, observer: &mut O) {
832 let _s = tracing::debug_span!("apply_update_events").entered();
833
834 loop {
835 let events: Vec<_> = self.pending.events.drain(..).collect();
836 if events.is_empty() {
837 break;
838 }
839 for mut update in events {
840 let _s = tracing::debug_span!("update_event", ?update).entered();
841
842 self.loop_monitor.maybe_trace(|| {
843 let _t = INSTANT_APP.pause_for_update();
844
845 {
846 let _s = tracing::debug_span!("ext.event_preview").entered();
847 self.extensions.event_preview(&mut update);
848 }
849 {
850 let _s = tracing::debug_span!("obs.event_preview").entered();
851 observer.event_preview(&mut update);
852 }
853 update.call_pre_actions();
854
855 {
856 let _s = tracing::debug_span!("ext.event_ui").entered();
857 self.extensions.event_ui(&mut update);
858 }
859 {
860 let _s = tracing::debug_span!("obs.event_ui").entered();
861 observer.event_ui(&mut update);
862 }
863 {
864 let _s = tracing::debug_span!("ext.event").entered();
865 self.extensions.event(&mut update);
866 }
867 {
868 let _s = tracing::debug_span!("obs.event").entered();
869 observer.event(&mut update);
870 }
871 update.call_pos_actions();
872 });
873
874 self.apply_updates(observer);
875 }
876 }
877 }
878
879 fn view_is_busy(&mut self) -> bool {
880 VIEW_PROCESS.is_available() && VIEW_PROCESS.pending_frames() > 0
881 }
882
883 fn finish_frame<O: AppEventObserver>(&mut self, observer: &mut O) {
885 debug_assert!(!self.view_is_busy());
886
887 self.pending |= UPDATES.apply_layout_render();
888
889 while mem::take(&mut self.pending.layout) {
890 let _s = tracing::debug_span!("apply_layout").entered();
891
892 let mut layout_widgets = mem::take(&mut self.pending.layout_widgets);
893
894 self.loop_monitor.maybe_trace(|| {
895 let _t = INSTANT_APP.pause_for_update();
896
897 {
898 let _s = tracing::debug_span!("ext.layout").entered();
899 self.extensions.layout(&mut layout_widgets);
900 }
901 {
902 let _s = tracing::debug_span!("obs.layout").entered();
903 observer.layout(&mut layout_widgets);
904 }
905 });
906
907 self.apply_updates(observer);
908 self.apply_update_events(observer);
909 self.pending |= UPDATES.apply_layout_render();
910 }
911
912 if mem::take(&mut self.pending.render) {
913 let _s = tracing::debug_span!("apply_render").entered();
914
915 let mut render_widgets = mem::take(&mut self.pending.render_widgets);
916 let mut render_update_widgets = mem::take(&mut self.pending.render_update_widgets);
917
918 let _t = INSTANT_APP.pause_for_update();
919
920 {
921 let _s = tracing::debug_span!("ext.render").entered();
922 self.extensions.render(&mut render_widgets, &mut render_update_widgets);
923 }
924 {
925 let _s = tracing::debug_span!("obs.render").entered();
926 observer.render(&mut render_widgets, &mut render_update_widgets);
927 }
928 }
929
930 self.loop_monitor.finish_frame();
931 }
932}
933impl<E: AppExtension> Drop for RunningApp<E> {
934 fn drop(&mut self) {
935 let _s = tracing::debug_span!("ext.deinit").entered();
936 self.extensions.deinit();
937 VIEW_PROCESS.exit();
938 }
939}
940
941pub struct AppStartArgs {
946 _private: (),
947}
948
949pub fn on_app_start(handler: impl FnMut(&AppStartArgs) + Send + 'static) {
955 zng_unique_id::hot_static_ref!(ON_APP_START).lock().push(Box::new(handler))
956}
957zng_unique_id::hot_static! {
958 static ON_APP_START: Mutex<Vec<AppStartHandler>> = Mutex::new(vec![]);
959}
960type AppStartHandler = Box<dyn FnMut(&AppStartArgs) + Send + 'static>;
961
962#[derive(Debug)]
964pub(crate) struct LoopTimer {
965 now: DInstant,
966 deadline: Option<Deadline>,
967}
968impl Default for LoopTimer {
969 fn default() -> Self {
970 Self {
971 now: INSTANT.now(),
972 deadline: None,
973 }
974 }
975}
976impl LoopTimer {
977 pub fn elapsed(&mut self, deadline: Deadline) -> bool {
980 if deadline.0 <= self.now {
981 true
982 } else {
983 self.register(deadline);
984 false
985 }
986 }
987
988 pub fn register(&mut self, deadline: Deadline) {
990 if let Some(d) = &mut self.deadline {
991 if deadline < *d {
992 *d = deadline;
993 }
994 } else {
995 self.deadline = Some(deadline)
996 }
997 }
998
999 pub(crate) fn poll(&mut self) -> Option<Deadline> {
1001 self.deadline
1002 }
1003
1004 pub(crate) fn awake(&mut self) -> bool {
1006 self.now = INSTANT.now();
1007 if let Some(d) = self.deadline
1008 && d.0 <= self.now
1009 {
1010 self.deadline = None;
1011 return true;
1012 }
1013 false
1014 }
1015
1016 pub fn now(&self) -> DInstant {
1018 self.now
1019 }
1020}
1021impl zng_var::animation::AnimationTimer for LoopTimer {
1022 fn elapsed(&mut self, deadline: Deadline) -> bool {
1023 self.elapsed(deadline)
1024 }
1025
1026 fn register(&mut self, deadline: Deadline) {
1027 self.register(deadline)
1028 }
1029
1030 fn now(&self) -> DInstant {
1031 self.now()
1032 }
1033}
1034
1035#[derive(Default)]
1036struct LoopMonitor {
1037 update_count: u16,
1038 skipped: bool,
1039 trace: Vec<UpdateTrace>,
1040}
1041impl LoopMonitor {
1042 pub fn update(&mut self, update_once: impl FnOnce() -> bool) -> bool {
1044 self.update_count += 1;
1045
1046 if self.update_count < 500 {
1047 update_once()
1048 } else if self.update_count < 1000 {
1049 UpdatesTrace::collect_trace(&mut self.trace, update_once)
1050 } else if self.update_count == 1000 {
1051 self.skipped = true;
1052 let trace = UpdatesTrace::format_trace(mem::take(&mut self.trace));
1053 tracing::error!(
1054 "updated 1000 times without rendering, probably stuck in an infinite loop\n\
1055 will start skipping updates to render and poll system events\n\
1056 top 20 most frequent update requests (in 500 cycles):\n\
1057 {trace}\n\
1058 you can use `UpdatesTraceUiNodeExt` and `updates_trace_event` to refine the trace"
1059 );
1060 false
1061 } else if self.update_count == 1500 {
1062 self.update_count = 1001;
1063 false
1064 } else {
1065 update_once()
1066 }
1067 }
1068
1069 pub fn maybe_trace(&mut self, notify_once: impl FnOnce()) {
1070 if (500..1000).contains(&self.update_count) {
1071 UpdatesTrace::collect_trace(&mut self.trace, notify_once);
1072 } else {
1073 notify_once();
1074 }
1075 }
1076
1077 pub fn finish_frame(&mut self) {
1078 if !self.skipped {
1079 self.skipped = false;
1080 self.update_count = 0;
1081 self.trace = vec![];
1082 }
1083 }
1084}
1085
1086impl APP {
1087 pub fn exit(&self) -> ResponseVar<ExitCancelled> {
1096 APP_PROCESS_SV.write().exit()
1097 }
1098
1099 pub fn is_suspended(&self) -> Var<bool> {
1107 APP_PROCESS_SV.read().is_suspended.read_only()
1108 }
1109}
1110
1111impl APP {
1115 pub fn pause_time_for_update(&self) -> Var<bool> {
1121 APP_PROCESS_SV.read().pause_time_for_updates.clone()
1122 }
1123
1124 pub fn start_manual_time(&self) {
1132 INSTANT_APP.set_mode(InstantMode::Manual);
1133 INSTANT_APP.set_now(INSTANT.now());
1134 UPDATES.update(None);
1135 }
1136
1137 pub fn advance_manual_time(&self, advance: Duration) {
1148 INSTANT_APP.advance_now(advance);
1149 UPDATES.update(None);
1150 }
1151
1152 pub fn set_manual_time(&self, now: DInstant) {
1161 INSTANT_APP.set_now(now);
1162 UPDATES.update(None);
1163 }
1164
1165 pub fn end_manual_time(&self) {
1167 INSTANT_APP.set_mode(match APP.pause_time_for_update().get() {
1168 true => InstantMode::UpdatePaused,
1169 false => InstantMode::Now,
1170 });
1171 UPDATES.update(None);
1172 }
1173}
1174
1175command! {
1176 pub static EXIT_CMD = {
1180 l10n!: true,
1181 name: "Exit",
1182 info: "Close all windows and exit",
1183 shortcut: shortcut!(Exit),
1184 };
1185}
1186
1187#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1191pub struct ExitCancelled;
1192impl fmt::Display for ExitCancelled {
1193 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1194 write!(f, "exit request cancelled")
1195 }
1196}
1197
1198struct AppIntrinsic {
1199 exit_handle: CommandHandle,
1200 pending_exit: Option<PendingExit>,
1201}
1202struct PendingExit {
1203 handle: EventPropagationHandle,
1204 response: ResponderVar<ExitCancelled>,
1205}
1206impl AppIntrinsic {
1207 pub(super) fn pre_init(is_headed: bool, with_renderer: bool, view_process_exe: PathBuf, view_process_env: HashMap<Txt, Txt>) -> Self {
1209 APP_PROCESS_SV
1210 .read()
1211 .pause_time_for_updates
1212 .hook(|a| {
1213 if !matches!(INSTANT.mode(), zng_time::InstantMode::Manual) {
1214 if *a.value() {
1215 INSTANT_APP.set_mode(InstantMode::UpdatePaused);
1216 } else {
1217 INSTANT_APP.set_mode(InstantMode::Now);
1218 }
1219 }
1220 true
1221 })
1222 .perm();
1223
1224 if is_headed {
1225 debug_assert!(with_renderer);
1226
1227 let view_evs_sender = UPDATES.sender();
1228 VIEW_PROCESS.start(view_process_exe, view_process_env, false, move |ev| {
1229 let _ = view_evs_sender.send_view_event(ev);
1230 });
1231 } else if with_renderer {
1232 let view_evs_sender = UPDATES.sender();
1233 VIEW_PROCESS.start(view_process_exe, view_process_env, true, move |ev| {
1234 let _ = view_evs_sender.send_view_event(ev);
1235 });
1236 }
1237
1238 AppIntrinsic {
1239 exit_handle: EXIT_CMD.subscribe(true),
1240 pending_exit: None,
1241 }
1242 }
1243
1244 pub(super) fn exit(&mut self) -> bool {
1246 if let Some(pending) = self.pending_exit.take() {
1247 if pending.handle.is_stopped() {
1248 pending.response.respond(ExitCancelled);
1249 false
1250 } else {
1251 true
1252 }
1253 } else {
1254 false
1255 }
1256 }
1257}
1258impl AppExtension for AppIntrinsic {
1259 fn event_preview(&mut self, update: &mut EventUpdate) {
1260 if VIEW_PROCESS_INITED_EVENT.has(update) {
1261 let filter = APP_PROCESS_SV.read().device_events_filter.get();
1262 if !filter.is_empty()
1263 && let Err(e) = VIEW_PROCESS.set_device_events_filter(filter)
1264 {
1265 tracing::error!("cannot set device events on the view-process, {e}");
1266 }
1267 } else if let Some(args) = EXIT_CMD.on(update) {
1268 args.handle_enabled(&self.exit_handle, |_| {
1269 APP.exit();
1270 });
1271 }
1272 }
1273
1274 fn update(&mut self) {
1275 let mut sv = APP_PROCESS_SV.write();
1276 if let Some(filter) = sv.device_events_filter.get_new()
1277 && let Err(e) = VIEW_PROCESS.set_device_events_filter(filter)
1278 {
1279 tracing::error!("cannot set device events on the view-process, {e}");
1280 }
1281 if let Some(response) = sv.take_requests() {
1282 let args = ExitRequestedArgs::now();
1283 self.pending_exit = Some(PendingExit {
1284 handle: args.propagation().clone(),
1285 response,
1286 });
1287 EXIT_REQUESTED_EVENT.notify(args);
1288 }
1289 }
1290}
1291
1292pub(crate) fn assert_not_view_process() {
1293 if zng_view_api::ViewConfig::from_env().is_some() {
1294 panic!("cannot start App in view-process");
1295 }
1296}
1297#[cfg(feature = "deadlock_detection")]
1302pub fn spawn_deadlock_detection() {
1303 use parking_lot::deadlock;
1304 use std::{
1305 sync::atomic::{self, AtomicBool},
1306 thread,
1307 time::*,
1308 };
1309
1310 static CHECK_RUNNING: AtomicBool = AtomicBool::new(false);
1311
1312 if CHECK_RUNNING.swap(true, atomic::Ordering::SeqCst) {
1313 return;
1314 }
1315
1316 thread::Builder::new()
1317 .name("deadlock_detection".into())
1318 .stack_size(256 * 1024)
1319 .spawn(|| {
1320 loop {
1321 thread::sleep(Duration::from_secs(10));
1322
1323 let deadlocks = deadlock::check_deadlock();
1324 if deadlocks.is_empty() {
1325 continue;
1326 }
1327
1328 use std::fmt::Write;
1329 let mut msg = String::new();
1330
1331 let _ = writeln!(&mut msg, "{} deadlocks detected", deadlocks.len());
1332 for (i, threads) in deadlocks.iter().enumerate() {
1333 let _ = writeln!(&mut msg, "Deadlock #{}, {} threads", i, threads.len());
1334 for t in threads {
1335 let _ = writeln!(&mut msg, "Thread Id {:#?}", t.thread_id());
1336 let _ = writeln!(&mut msg, "{:#?}", t.backtrace());
1337 }
1338 }
1339
1340 #[cfg(not(feature = "test_util"))]
1341 eprint!("{msg}");
1342
1343 #[cfg(feature = "test_util")]
1344 {
1345 use std::io::Write;
1348 let _ = write!(&mut std::io::stderr(), "{msg}");
1349 zng_env::exit(-1);
1350 }
1351 }
1352 })
1353 .expect("failed to spawn thread");
1354}
1355#[cfg(not(feature = "deadlock_detection"))]
1360pub fn spawn_deadlock_detection() {}
1361
1362app_local! {
1363 pub(super) static APP_PROCESS_SV: AppProcessService = AppProcessService {
1364 exit_requests: None,
1365 extensions: None,
1366 device_events_filter: zng_var::var(Default::default()),
1367 pause_time_for_updates: zng_var::var(true),
1368 is_suspended: zng_var::var(false),
1369 };
1370}
1371
1372pub(super) struct AppProcessService {
1373 exit_requests: Option<ResponderVar<ExitCancelled>>,
1374 extensions: Option<Arc<AppExtensionsInfo>>,
1375 pub(crate) device_events_filter: Var<DeviceEventsFilter>,
1376 pause_time_for_updates: Var<bool>,
1377 is_suspended: Var<bool>,
1378}
1379impl AppProcessService {
1380 pub(super) fn take_requests(&mut self) -> Option<ResponderVar<ExitCancelled>> {
1381 self.exit_requests.take()
1382 }
1383
1384 fn exit(&mut self) -> ResponseVar<ExitCancelled> {
1385 if let Some(r) = &self.exit_requests {
1386 r.response_var()
1387 } else {
1388 let (responder, response) = response_var();
1389 self.exit_requests = Some(responder);
1390 UPDATES.update(None);
1391 response
1392 }
1393 }
1394
1395 pub(super) fn extensions(&self) -> Arc<AppExtensionsInfo> {
1396 self.extensions
1397 .clone()
1398 .unwrap_or_else(|| Arc::new(AppExtensionsInfo { infos: vec![] }))
1399 }
1400
1401 pub(super) fn set_extensions(&mut self, info: AppExtensionsInfo) {
1402 self.extensions = Some(Arc::new(info));
1403 }
1404
1405 pub(super) fn is_running(&self) -> bool {
1406 self.extensions.is_some()
1407 }
1408}
1409
1410#[derive(Debug)]
1412#[allow(clippy::large_enum_variant)] pub(crate) enum AppEvent {
1414 ViewEvent(zng_view_api::Event),
1416 Event(crate::event::EventUpdateMsg),
1418 Update(UpdateOp, Option<WidgetId>),
1420 ResumeUnwind(PanicPayload),
1422 CheckUpdate,
1424}
1425
1426#[derive(Clone)]
1432pub struct AppEventSender(flume::Sender<AppEvent>);
1433impl AppEventSender {
1434 pub(crate) fn new() -> (Self, flume::Receiver<AppEvent>) {
1435 let (sender, receiver) = flume::unbounded();
1436 (Self(sender), receiver)
1437 }
1438
1439 #[allow(clippy::result_large_err)] fn send_app_event(&self, event: AppEvent) -> Result<(), AppChannelError> {
1441 self.0.send(event).map_err(|_| AppChannelError::Disconnected)
1442 }
1443
1444 #[allow(clippy::result_large_err)]
1445 fn send_view_event(&self, event: zng_view_api::Event) -> Result<(), AppChannelError> {
1446 self.0.send(AppEvent::ViewEvent(event)).map_err(|_| AppChannelError::Disconnected)
1447 }
1448
1449 pub fn send_update(&self, op: UpdateOp, target: impl Into<Option<WidgetId>>) -> Result<(), AppChannelError> {
1451 UpdatesTrace::log_update();
1452 self.send_app_event(AppEvent::Update(op, target.into()))
1453 .map_err(|_| AppChannelError::Disconnected)
1454 }
1455
1456 pub(crate) fn send_event(&self, event: crate::event::EventUpdateMsg) -> Result<(), AppChannelError> {
1458 self.send_app_event(AppEvent::Event(event))
1459 .map_err(|_| AppChannelError::Disconnected)
1460 }
1461
1462 pub fn send_resume_unwind(&self, payload: PanicPayload) -> Result<(), AppChannelError> {
1464 self.send_app_event(AppEvent::ResumeUnwind(payload))
1465 .map_err(|_| AppChannelError::Disconnected)
1466 }
1467
1468 pub(crate) fn send_check_update(&self) -> Result<(), AppChannelError> {
1470 self.send_app_event(AppEvent::CheckUpdate)
1471 .map_err(|_| AppChannelError::Disconnected)
1472 }
1473
1474 pub fn waker(&self, target: impl Into<Option<WidgetId>>) -> Waker {
1476 Arc::new(AppWaker(self.0.clone(), target.into())).into()
1477 }
1478
1479 pub fn ext_channel<T>(&self) -> (AppExtSender<T>, AppExtReceiver<T>) {
1481 let (sender, receiver) = flume::unbounded();
1482
1483 (
1484 AppExtSender {
1485 update: self.clone(),
1486 sender,
1487 },
1488 AppExtReceiver { receiver },
1489 )
1490 }
1491
1492 pub fn ext_channel_bounded<T>(&self, cap: usize) -> (AppExtSender<T>, AppExtReceiver<T>) {
1494 let (sender, receiver) = flume::bounded(cap);
1495
1496 (
1497 AppExtSender {
1498 update: self.clone(),
1499 sender,
1500 },
1501 AppExtReceiver { receiver },
1502 )
1503 }
1504}
1505
1506struct AppWaker(flume::Sender<AppEvent>, Option<WidgetId>);
1507impl std::task::Wake for AppWaker {
1508 fn wake(self: std::sync::Arc<Self>) {
1509 self.wake_by_ref()
1510 }
1511 fn wake_by_ref(self: &Arc<Self>) {
1512 let _ = self.0.send(AppEvent::Update(UpdateOp::Update, self.1));
1513 }
1514}
1515
1516type PanicPayload = Box<dyn std::any::Any + Send + 'static>;
1517
1518pub struct AppExtSender<T> {
1522 update: AppEventSender,
1523 sender: flume::Sender<T>,
1524}
1525impl<T> Clone for AppExtSender<T> {
1526 fn clone(&self) -> Self {
1527 Self {
1528 update: self.update.clone(),
1529 sender: self.sender.clone(),
1530 }
1531 }
1532}
1533impl<T: Send> AppExtSender<T> {
1534 pub fn send(&self, msg: T) -> Result<(), AppChannelError> {
1536 match self.update.send_update(UpdateOp::Update, None) {
1537 Ok(()) => self.sender.send(msg).map_err(|_| AppChannelError::Disconnected),
1538 Err(_) => Err(AppChannelError::Disconnected),
1539 }
1540 }
1541
1542 pub fn send_timeout(&self, msg: T, dur: Duration) -> Result<(), AppChannelError> {
1544 match self.update.send_update(UpdateOp::Update, None) {
1545 Ok(()) => self.sender.send_timeout(msg, dur).map_err(|e| match e {
1546 flume::SendTimeoutError::Timeout(_) => AppChannelError::Timeout,
1547 flume::SendTimeoutError::Disconnected(_) => AppChannelError::Disconnected,
1548 }),
1549 Err(_) => Err(AppChannelError::Disconnected),
1550 }
1551 }
1552
1553 pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), AppChannelError> {
1555 match self.update.send_update(UpdateOp::Update, None) {
1556 Ok(()) => self.sender.send_deadline(msg, deadline).map_err(|e| match e {
1557 flume::SendTimeoutError::Timeout(_) => AppChannelError::Timeout,
1558 flume::SendTimeoutError::Disconnected(_) => AppChannelError::Disconnected,
1559 }),
1560 Err(_) => Err(AppChannelError::Disconnected),
1561 }
1562 }
1563}
1564
1565pub struct AppExtReceiver<T> {
1569 receiver: flume::Receiver<T>,
1570}
1571impl<T> Clone for AppExtReceiver<T> {
1572 fn clone(&self) -> Self {
1573 Self {
1574 receiver: self.receiver.clone(),
1575 }
1576 }
1577}
1578impl<T> AppExtReceiver<T> {
1579 pub fn try_recv(&self) -> Result<T, Option<AppChannelError>> {
1584 self.receiver.try_recv().map_err(|e| match e {
1585 flume::TryRecvError::Empty => None,
1586 flume::TryRecvError::Disconnected => Some(AppChannelError::Disconnected),
1587 })
1588 }
1589}
1590
1591#[derive(Debug, Clone)]
1593#[non_exhaustive]
1594pub enum AppChannelError {
1595 Disconnected,
1597 Timeout,
1599}
1600impl fmt::Display for AppChannelError {
1601 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1602 match self {
1603 AppChannelError::Disconnected => write!(f, "cannot receive because the sender disconnected"),
1604 AppChannelError::Timeout => write!(f, "deadline elapsed before message could be send/received"),
1605 }
1606 }
1607}
1608impl std::error::Error for AppChannelError {}
1609impl From<flume::RecvTimeoutError> for AppChannelError {
1610 fn from(value: flume::RecvTimeoutError) -> Self {
1611 match value {
1612 flume::RecvTimeoutError::Timeout => AppChannelError::Timeout,
1613 flume::RecvTimeoutError::Disconnected => AppChannelError::Disconnected,
1614 }
1615 }
1616}
1617
1618event_args! {
1619 pub struct ExitRequestedArgs {
1623
1624 ..
1625
1626 fn delivery_list(&self, list: &mut UpdateDeliveryList) {
1628 list.search_all()
1629 }
1630 }
1631}
1632
1633event! {
1634 pub static EXIT_REQUESTED_EVENT: ExitRequestedArgs;
1642}
1643
1644trait ReceiverExt<T> {
1646 fn recv_deadline_sp(&self, deadline: Deadline) -> Result<T, flume::RecvTimeoutError>;
1648}
1649
1650const WORST_SLEEP_ERR: Duration = Duration::from_millis(if cfg!(windows) { 20 } else { 10 });
1651const WORST_SPIN_ERR: Duration = Duration::from_millis(if cfg!(windows) { 2 } else { 1 });
1652
1653impl<T> ReceiverExt<T> for flume::Receiver<T> {
1654 fn recv_deadline_sp(&self, deadline: Deadline) -> Result<T, flume::RecvTimeoutError> {
1655 loop {
1656 if let Some(d) = deadline.0.checked_duration_since(INSTANT.now()) {
1657 if matches!(INSTANT.mode(), zng_time::InstantMode::Manual) {
1658 match self.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
1661 Err(flume::RecvTimeoutError::Timeout) => continue, interrupt => return interrupt,
1663 }
1664 } else if d > WORST_SLEEP_ERR {
1665 #[cfg(not(target_arch = "wasm32"))]
1667 match self.recv_deadline(deadline.0.checked_sub(WORST_SLEEP_ERR).unwrap().into()) {
1668 Err(flume::RecvTimeoutError::Timeout) => continue, interrupt => return interrupt,
1670 }
1671
1672 #[cfg(target_arch = "wasm32")] match self.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
1674 Err(flume::RecvTimeoutError::Timeout) => continue, interrupt => return interrupt,
1676 }
1677 } else if d > WORST_SPIN_ERR {
1678 let spin_deadline = Deadline(deadline.0.checked_sub(WORST_SPIN_ERR).unwrap());
1679
1680 while !spin_deadline.has_elapsed() {
1682 match self.try_recv() {
1683 Err(flume::TryRecvError::Empty) => std::thread::yield_now(),
1684 Err(flume::TryRecvError::Disconnected) => return Err(flume::RecvTimeoutError::Disconnected),
1685 Ok(msg) => return Ok(msg),
1686 }
1687 }
1688 continue; } else {
1690 while !deadline.has_elapsed() {
1692 std::thread::yield_now();
1693 }
1694 return Err(flume::RecvTimeoutError::Timeout);
1695 }
1696 } else {
1697 return Err(flume::RecvTimeoutError::Timeout);
1698 }
1699 }
1700 }
1701}