1use std::{
2 collections::HashMap,
3 fmt, mem,
4 path::PathBuf,
5 sync::Arc,
6 task::Waker,
7 time::{Duration, Instant},
8};
9
10use crate::Deadline;
11use parking_lot::Mutex;
12use zng_app_context::{AppScope, app_local};
13use zng_task::DEADLINE_APP;
14use zng_time::{INSTANT_APP, InstantMode};
15use zng_txt::Txt;
16use zng_var::{ArcVar, ReadOnlyArcVar, ResponderVar, ResponseVar, VARS, VARS_APP, Var as _, response_var};
17
18use crate::{
19 APP, AppControlFlow, AppEventObserver, AppExtension, AppExtensionsInfo, DInstant, INSTANT,
20 event::{AnyEventArgs, CommandHandle, CommandInfoExt, CommandNameExt, EVENTS, EventPropagationHandle, command, event},
21 event_args,
22 shortcut::CommandShortcutExt,
23 shortcut::shortcut,
24 timer::TimersService,
25 update::{
26 ContextUpdates, EventUpdate, InfoUpdates, LayoutUpdates, RenderUpdates, UPDATES, UpdateOp, UpdateTrace, UpdatesTrace, WidgetUpdates,
27 },
28 view_process::{raw_device_events::DeviceId, *},
29 widget::WidgetId,
30 window::WindowId,
31};
32
33pub(crate) struct RunningApp<E: AppExtension> {
35 extensions: (AppIntrinsic, E),
36
37 receiver: flume::Receiver<AppEvent>,
38
39 loop_timer: LoopTimer,
40 loop_monitor: LoopMonitor,
41
42 pending_view_events: Vec<zng_view_api::Event>,
43 pending_view_frame_events: Vec<zng_view_api::window::EventFrameRendered>,
44 pending: ContextUpdates,
45
46 exited: bool,
47
48 _scope: AppScope,
50}
51impl<E: AppExtension> RunningApp<E> {
52 pub(crate) fn start(
53 scope: AppScope,
54 mut extensions: E,
55 is_headed: bool,
56 with_renderer: bool,
57 view_process_exe: Option<PathBuf>,
58 view_process_env: HashMap<Txt, Txt>,
59 ) -> Self {
60 let _s = tracing::debug_span!("APP::start").entered();
61
62 let (sender, receiver) = AppEventSender::new();
63
64 UPDATES.init(sender);
65
66 fn app_waker() {
67 UPDATES.update(None);
68 }
69 VARS_APP.init_app_waker(app_waker);
70 VARS_APP.init_modify_trace(UpdatesTrace::log_var);
71 DEADLINE_APP.init_deadline_service(crate::timer::deadline_service);
72 zng_var::types::TRANSITIONABLE_APP.init_rgba_lerp(zng_color::lerp_rgba);
73
74 let mut info = AppExtensionsInfo::start();
75 {
76 let _t = INSTANT_APP.pause_for_update();
77 extensions.register(&mut info);
78 }
79 let device_events = extensions.enable_device_events();
80
81 {
82 let mut sv = APP_PROCESS_SV.write();
83 sv.set_extensions(info, device_events);
84 }
85
86 if with_renderer && view_process_exe.is_none() {
87 zng_env::assert_inited();
88 }
89
90 #[cfg(not(target_arch = "wasm32"))]
91 let view_process_exe = view_process_exe.unwrap_or_else(|| std::env::current_exe().expect("current_exe"));
92 #[cfg(target_arch = "wasm32")]
93 let view_process_exe = std::path::PathBuf::from("<wasm>");
94
95 let process = AppIntrinsic::pre_init(is_headed, with_renderer, view_process_exe, view_process_env, device_events);
96
97 {
98 let _s = tracing::debug_span!("extensions.init").entered();
99 extensions.init();
100 }
101
102 let args = AppStartArgs { _private: () };
103 for h in zng_unique_id::hot_static_ref!(ON_APP_START).lock().iter_mut() {
104 h(&args)
105 }
106
107 RunningApp {
108 extensions: (process, extensions),
109
110 receiver,
111
112 loop_timer: LoopTimer::default(),
113 loop_monitor: LoopMonitor::default(),
114
115 pending_view_events: Vec::with_capacity(100),
116 pending_view_frame_events: Vec::with_capacity(5),
117 pending: ContextUpdates {
118 events: Vec::with_capacity(100),
119 update: false,
120 info: false,
121 layout: false,
122 render: false,
123 update_widgets: WidgetUpdates::default(),
124 info_widgets: InfoUpdates::default(),
125 layout_widgets: LayoutUpdates::default(),
126 render_widgets: RenderUpdates::default(),
127 render_update_widgets: RenderUpdates::default(),
128 },
129 exited: false,
130
131 _scope: scope,
132 }
133 }
134
135 pub fn has_exited(&self) -> bool {
136 self.exited
137 }
138
139 pub fn notify_event<O: AppEventObserver>(&mut self, mut update: EventUpdate, observer: &mut O) {
141 let _scope = tracing::trace_span!("notify_event", event = update.event().name()).entered();
142
143 let _t = INSTANT_APP.pause_for_update();
144
145 update.event().on_update(&mut update);
146
147 self.extensions.event_preview(&mut update);
148 observer.event_preview(&mut update);
149 update.call_pre_actions();
150
151 self.extensions.event_ui(&mut update);
152 observer.event_ui(&mut update);
153
154 self.extensions.event(&mut update);
155 observer.event(&mut update);
156 update.call_pos_actions();
157 }
158
159 fn device_id(&mut self, id: zng_view_api::DeviceId) -> DeviceId {
160 VIEW_PROCESS.device_id(id)
161 }
162
163 fn on_view_event<O: AppEventObserver>(&mut self, ev: zng_view_api::Event, observer: &mut O) {
165 use crate::view_process::raw_device_events::*;
166 use crate::view_process::raw_events::*;
167 use zng_view_api::Event;
168
169 fn window_id(id: zng_view_api::window::WindowId) -> WindowId {
170 WindowId::from_raw(id.get())
171 }
172
173 match ev {
174 Event::MouseMoved {
175 window: w_id,
176 device: d_id,
177 coalesced_pos,
178 position,
179 } => {
180 let args = RawMouseMovedArgs::now(window_id(w_id), self.device_id(d_id), coalesced_pos, position);
181 self.notify_event(RAW_MOUSE_MOVED_EVENT.new_update(args), observer);
182 }
183 Event::MouseEntered {
184 window: w_id,
185 device: d_id,
186 } => {
187 let args = RawMouseArgs::now(window_id(w_id), self.device_id(d_id));
188 self.notify_event(RAW_MOUSE_ENTERED_EVENT.new_update(args), observer);
189 }
190 Event::MouseLeft {
191 window: w_id,
192 device: d_id,
193 } => {
194 let args = RawMouseArgs::now(window_id(w_id), self.device_id(d_id));
195 self.notify_event(RAW_MOUSE_LEFT_EVENT.new_update(args), observer);
196 }
197 Event::WindowChanged(c) => {
198 let monitor_id = c.monitor.map(|id| VIEW_PROCESS.monitor_id(id));
199 let args = RawWindowChangedArgs::now(
200 window_id(c.window),
201 c.state,
202 c.position,
203 monitor_id,
204 c.size,
205 c.safe_padding,
206 c.cause,
207 c.frame_wait_id,
208 );
209 self.notify_event(RAW_WINDOW_CHANGED_EVENT.new_update(args), observer);
210 }
211 Event::DragHovered { window, data, allowed } => {
212 let args = RawDragHoveredArgs::now(window_id(window), data, allowed);
213 self.notify_event(RAW_DRAG_HOVERED_EVENT.new_update(args), observer);
214 }
215 Event::DragMoved {
216 window,
217 coalesced_pos,
218 position,
219 } => {
220 let args = RawDragMovedArgs::now(window_id(window), coalesced_pos, position);
221 self.notify_event(RAW_DRAG_MOVED_EVENT.new_update(args), observer);
222 }
223 Event::DragDropped {
224 window,
225 data,
226 allowed,
227 drop_id,
228 } => {
229 let args = RawDragDroppedArgs::now(window_id(window), data, allowed, drop_id);
230 self.notify_event(RAW_DRAG_DROPPED_EVENT.new_update(args), observer);
231 }
232 Event::DragCancelled { window } => {
233 let args = RawDragCancelledArgs::now(window_id(window));
234 self.notify_event(RAW_DRAG_CANCELLED_EVENT.new_update(args), observer);
235 }
236 Event::AppDragEnded { window, drag, applied } => {
237 let args = RawAppDragEndedArgs::now(window_id(window), drag, applied);
238 self.notify_event(RAW_APP_DRAG_ENDED_EVENT.new_update(args), observer);
239 }
240 Event::FocusChanged { prev, new } => {
241 let args = RawWindowFocusArgs::now(prev.map(window_id), new.map(window_id));
242 self.notify_event(RAW_WINDOW_FOCUS_EVENT.new_update(args), observer);
243 }
244 Event::KeyboardInput {
245 window: w_id,
246 device: d_id,
247 key_code,
248 state,
249 key,
250 key_location,
251 key_modified,
252 text,
253 } => {
254 let args = RawKeyInputArgs::now(
255 window_id(w_id),
256 self.device_id(d_id),
257 key_code,
258 key_location,
259 state,
260 key,
261 key_modified,
262 text,
263 );
264 self.notify_event(RAW_KEY_INPUT_EVENT.new_update(args), observer);
265 }
266 Event::Ime { window: w_id, ime } => {
267 let args = RawImeArgs::now(window_id(w_id), ime);
268 self.notify_event(RAW_IME_EVENT.new_update(args), observer);
269 }
270
271 Event::MouseWheel {
272 window: w_id,
273 device: d_id,
274 delta,
275 phase,
276 } => {
277 let args = RawMouseWheelArgs::now(window_id(w_id), self.device_id(d_id), delta, phase);
278 self.notify_event(RAW_MOUSE_WHEEL_EVENT.new_update(args), observer);
279 }
280 Event::MouseInput {
281 window: w_id,
282 device: d_id,
283 state,
284 button,
285 } => {
286 let args = RawMouseInputArgs::now(window_id(w_id), self.device_id(d_id), state, button);
287 self.notify_event(RAW_MOUSE_INPUT_EVENT.new_update(args), observer);
288 }
289 Event::TouchpadPressure {
290 window: w_id,
291 device: d_id,
292 pressure,
293 stage,
294 } => {
295 let args = RawTouchpadPressureArgs::now(window_id(w_id), self.device_id(d_id), pressure, stage);
296 self.notify_event(RAW_TOUCHPAD_PRESSURE_EVENT.new_update(args), observer);
297 }
298 Event::AxisMotion {
299 window: w_id,
300 device: d_id,
301 axis,
302 value,
303 } => {
304 let args = RawAxisMotionArgs::now(window_id(w_id), self.device_id(d_id), axis, value);
305 self.notify_event(RAW_AXIS_MOTION_EVENT.new_update(args), observer);
306 }
307 Event::Touch {
308 window: w_id,
309 device: d_id,
310 touches,
311 } => {
312 let args = RawTouchArgs::now(window_id(w_id), self.device_id(d_id), touches);
313 self.notify_event(RAW_TOUCH_EVENT.new_update(args), observer);
314 }
315 Event::ScaleFactorChanged {
316 monitor: id,
317 windows,
318 scale_factor,
319 } => {
320 let monitor_id = VIEW_PROCESS.monitor_id(id);
321 let windows: Vec<_> = windows.into_iter().map(window_id).collect();
322 let args = RawScaleFactorChangedArgs::now(monitor_id, windows, scale_factor);
323 self.notify_event(RAW_SCALE_FACTOR_CHANGED_EVENT.new_update(args), observer);
324 }
325 Event::MonitorsChanged(monitors) => {
326 let monitors: Vec<_> = monitors.into_iter().map(|(id, info)| (VIEW_PROCESS.monitor_id(id), info)).collect();
327 let args = RawMonitorsChangedArgs::now(monitors);
328 self.notify_event(RAW_MONITORS_CHANGED_EVENT.new_update(args), observer);
329 }
330 Event::WindowCloseRequested(w_id) => {
331 let args = RawWindowCloseRequestedArgs::now(window_id(w_id));
332 self.notify_event(RAW_WINDOW_CLOSE_REQUESTED_EVENT.new_update(args), observer);
333 }
334 Event::WindowOpened(w_id, data) => {
335 let w_id = window_id(w_id);
336 let (window, data) = VIEW_PROCESS.on_window_opened(w_id, data);
337 let args = RawWindowOpenArgs::now(w_id, window, data);
338 self.notify_event(RAW_WINDOW_OPEN_EVENT.new_update(args), observer);
339 }
340 Event::HeadlessOpened(w_id, data) => {
341 let w_id = window_id(w_id);
342 let (surface, data) = VIEW_PROCESS.on_headless_opened(w_id, data);
343 let args = RawHeadlessOpenArgs::now(w_id, surface, data);
344 self.notify_event(RAW_HEADLESS_OPEN_EVENT.new_update(args), observer);
345 }
346 Event::WindowOrHeadlessOpenError { id: w_id, error } => {
347 let w_id = window_id(w_id);
348 let args = RawWindowOrHeadlessOpenErrorArgs::now(w_id, error);
349 self.notify_event(RAW_WINDOW_OR_HEADLESS_OPEN_ERROR_EVENT.new_update(args), observer);
350 }
351 Event::WindowClosed(w_id) => {
352 let args = RawWindowCloseArgs::now(window_id(w_id));
353 self.notify_event(RAW_WINDOW_CLOSE_EVENT.new_update(args), observer);
354 }
355 Event::ImageMetadataLoaded {
356 image: id,
357 size,
358 ppi,
359 is_mask,
360 } => {
361 if let Some(img) = VIEW_PROCESS.on_image_metadata_loaded(id, size, ppi, is_mask) {
362 let args = RawImageArgs::now(img);
363 self.notify_event(RAW_IMAGE_METADATA_LOADED_EVENT.new_update(args), observer);
364 }
365 }
366 Event::ImagePartiallyLoaded {
367 image: id,
368 partial_size,
369 ppi,
370 is_opaque,
371 is_mask,
372 partial_pixels: partial_bgra8,
373 } => {
374 if let Some(img) = VIEW_PROCESS.on_image_partially_loaded(id, partial_size, ppi, is_opaque, is_mask, partial_bgra8) {
375 let args = RawImageArgs::now(img);
376 self.notify_event(RAW_IMAGE_PARTIALLY_LOADED_EVENT.new_update(args), observer);
377 }
378 }
379 Event::ImageLoaded(image) => {
380 if let Some(img) = VIEW_PROCESS.on_image_loaded(image) {
381 let args = RawImageArgs::now(img);
382 self.notify_event(RAW_IMAGE_LOADED_EVENT.new_update(args), observer);
383 }
384 }
385 Event::ImageLoadError { image: id, error } => {
386 if let Some(img) = VIEW_PROCESS.on_image_error(id, error) {
387 let args = RawImageArgs::now(img);
388 self.notify_event(RAW_IMAGE_LOAD_ERROR_EVENT.new_update(args), observer);
389 }
390 }
391 Event::ImageEncoded { image: id, format, data } => VIEW_PROCESS.on_image_encoded(id, format, data),
392 Event::ImageEncodeError { image: id, format, error } => {
393 VIEW_PROCESS.on_image_encode_error(id, format, error);
394 }
395 Event::FrameImageReady {
396 window: w_id,
397 frame: frame_id,
398 image: image_id,
399 selection,
400 } => {
401 if let Some(img) = VIEW_PROCESS.on_frame_image_ready(image_id) {
402 let args = RawFrameImageReadyArgs::now(img, window_id(w_id), frame_id, selection);
403 self.notify_event(RAW_FRAME_IMAGE_READY_EVENT.new_update(args), observer);
404 }
405 }
406
407 Event::AccessInit { window: w_id } => {
408 self.notify_event(crate::access::on_access_init(window_id(w_id)), observer);
409 }
410 Event::AccessCommand {
411 window: win_id,
412 target: wgt_id,
413 command,
414 } => {
415 if let Some(update) = crate::access::on_access_command(window_id(win_id), WidgetId::from_raw(wgt_id.0), command) {
416 self.notify_event(update, observer);
417 }
418 }
419 Event::AccessDeinit { window: w_id } => {
420 self.notify_event(crate::access::on_access_deinit(window_id(w_id)), observer);
421 }
422
423 Event::MsgDialogResponse(id, response) => {
425 VIEW_PROCESS.on_message_dlg_response(id, response);
426 }
427 Event::FileDialogResponse(id, response) => {
428 VIEW_PROCESS.on_file_dlg_response(id, response);
429 }
430
431 Event::ExtensionEvent(id, payload) => {
433 let args = RawExtensionEventArgs::now(id, payload);
434 self.notify_event(RAW_EXTENSION_EVENT.new_update(args), observer);
435 }
436
437 Event::FontsChanged => {
439 let args = RawFontChangedArgs::now();
440 self.notify_event(RAW_FONT_CHANGED_EVENT.new_update(args), observer);
441 }
442 Event::FontAaChanged(aa) => {
443 let args = RawFontAaChangedArgs::now(aa);
444 self.notify_event(RAW_FONT_AA_CHANGED_EVENT.new_update(args), observer);
445 }
446 Event::MultiClickConfigChanged(cfg) => {
447 let args = RawMultiClickConfigChangedArgs::now(cfg);
448 self.notify_event(RAW_MULTI_CLICK_CONFIG_CHANGED_EVENT.new_update(args), observer);
449 }
450 Event::AnimationsConfigChanged(cfg) => {
451 VARS_APP.set_sys_animations_enabled(cfg.enabled);
452 let args = RawAnimationsConfigChangedArgs::now(cfg);
453 self.notify_event(RAW_ANIMATIONS_CONFIG_CHANGED_EVENT.new_update(args), observer);
454 }
455 Event::KeyRepeatConfigChanged(cfg) => {
456 let args = RawKeyRepeatConfigChangedArgs::now(cfg);
457 self.notify_event(RAW_KEY_REPEAT_CONFIG_CHANGED_EVENT.new_update(args), observer);
458 }
459 Event::TouchConfigChanged(cfg) => {
460 let args = RawTouchConfigChangedArgs::now(cfg);
461 self.notify_event(RAW_TOUCH_CONFIG_CHANGED_EVENT.new_update(args), observer);
462 }
463 Event::LocaleChanged(cfg) => {
464 let args = RawLocaleChangedArgs::now(cfg);
465 self.notify_event(RAW_LOCALE_CONFIG_CHANGED_EVENT.new_update(args), observer);
466 }
467 Event::ColorsConfigChanged(cfg) => {
468 let args = RawColorsConfigChangedArgs::now(cfg);
469 self.notify_event(RAW_COLORS_CONFIG_CHANGED_EVENT.new_update(args), observer);
470 }
471 Event::ChromeConfigChanged(cfg) => {
472 let args = RawChromeConfigChangedArgs::now(cfg);
473 self.notify_event(RAW_CHROME_CONFIG_CHANGED_EVENT.new_update(args), observer);
474 }
475
476 Event::DeviceAdded(d_id) => {
478 let args = DeviceArgs::now(self.device_id(d_id));
479 self.notify_event(DEVICE_ADDED_EVENT.new_update(args), observer);
480 }
481 Event::DeviceRemoved(d_id) => {
482 let args = DeviceArgs::now(self.device_id(d_id));
483 self.notify_event(DEVICE_REMOVED_EVENT.new_update(args), observer);
484 }
485 Event::DeviceMouseMotion { device: d_id, delta } => {
486 let args = MouseMotionArgs::now(self.device_id(d_id), delta);
487 self.notify_event(MOUSE_MOTION_EVENT.new_update(args), observer);
488 }
489 Event::DeviceMouseWheel { device: d_id, delta } => {
490 let args = MouseWheelArgs::now(self.device_id(d_id), delta);
491 self.notify_event(MOUSE_WHEEL_EVENT.new_update(args), observer);
492 }
493 Event::DeviceMotion { device: d_id, axis, value } => {
494 let args = MotionArgs::now(self.device_id(d_id), axis, value);
495 self.notify_event(MOTION_EVENT.new_update(args), observer);
496 }
497 Event::DeviceButton {
498 device: d_id,
499 button,
500 state,
501 } => {
502 let args = ButtonArgs::now(self.device_id(d_id), button, state);
503 self.notify_event(BUTTON_EVENT.new_update(args), observer);
504 }
505 Event::DeviceKey {
506 device: d_id,
507 key_code,
508 state,
509 } => {
510 let args = KeyArgs::now(self.device_id(d_id), key_code, state);
511 self.notify_event(KEY_EVENT.new_update(args), observer);
512 }
513
514 Event::LowMemory => {
515 LOW_MEMORY_EVENT.notify(LowMemoryArgs::now());
516 }
517
518 Event::RecoveredFromComponentPanic { component, recover, panic } => {
519 tracing::error!(
520 "view-process recovered from internal component panic\n component: {component}\n recover: {recover}\n```panic\n{panic}\n```"
521 );
522 }
523
524 Event::Inited(zng_view_api::Inited { .. }) | Event::Suspended | Event::Disconnected(_) | Event::FrameRendered(_) => {
526 unreachable!()
527 } _ => {}
530 }
531 }
532
533 fn on_view_rendered_event<O: AppEventObserver>(&mut self, ev: zng_view_api::window::EventFrameRendered, observer: &mut O) {
535 debug_assert!(ev.window != zng_view_api::window::WindowId::INVALID);
536 let window_id = WindowId::from_raw(ev.window.get());
537 let image = ev.frame_image.map(|img| VIEW_PROCESS.on_frame_image(img));
539 let args = crate::view_process::raw_events::RawFrameRenderedArgs::now(window_id, ev.frame, image);
540 self.notify_event(crate::view_process::raw_events::RAW_FRAME_RENDERED_EVENT.new_update(args), observer);
541 }
542
543 pub(crate) fn run_headed(mut self) {
544 let mut observer = ();
545 #[cfg(feature = "dyn_app_extension")]
546 let mut observer = observer.as_dyn();
547
548 self.apply_updates(&mut observer);
549 self.apply_update_events(&mut observer);
550 let mut wait = false;
551 loop {
552 wait = match self.poll_impl(wait, &mut observer) {
553 AppControlFlow::Poll => false,
554 AppControlFlow::Wait => true,
555 AppControlFlow::Exit => break,
556 };
557 }
558 }
559
560 fn push_coalesce<O: AppEventObserver>(&mut self, ev: AppEvent, observer: &mut O) {
561 match ev {
562 AppEvent::ViewEvent(ev) => match ev {
563 zng_view_api::Event::FrameRendered(ev) => {
564 if ev.window == zng_view_api::window::WindowId::INVALID {
565 tracing::error!("ignored rendered event for invalid window id, {ev:?}");
566 return;
567 }
568
569 let window = WindowId::from_raw(ev.window.get());
570
571 {
573 if VIEW_PROCESS.is_available() {
574 VIEW_PROCESS.on_frame_rendered(window);
575 }
576 }
577
578 #[cfg(debug_assertions)]
579 if self.pending_view_frame_events.iter().any(|e| e.window == ev.window) {
580 tracing::warn!("window `{window:?}` probably sent a frame request without awaiting renderer idle");
581 }
582
583 self.pending_view_frame_events.push(ev);
584 }
585 zng_view_api::Event::Inited(zng_view_api::Inited {
586 generation,
587 is_respawn,
588 available_monitors,
589 multi_click_config,
590 key_repeat_config,
591 touch_config,
592 font_aa,
593 animations_config,
594 locale_config,
595 colors_config,
596 chrome_config,
597 extensions,
598 ..
599 }) => {
600 if is_respawn {
602 VIEW_PROCESS.on_respawned(generation);
603 APP_PROCESS_SV.read().is_suspended.set(false);
604 }
605
606 VIEW_PROCESS.handle_inited(generation, extensions.clone());
607
608 let monitors: Vec<_> = available_monitors
609 .into_iter()
610 .map(|(id, info)| (VIEW_PROCESS.monitor_id(id), info))
611 .collect();
612
613 VARS.animations_enabled().set(animations_config.enabled);
614
615 let args = crate::view_process::ViewProcessInitedArgs::now(
616 generation,
617 is_respawn,
618 monitors,
619 multi_click_config,
620 key_repeat_config,
621 touch_config,
622 font_aa,
623 animations_config,
624 locale_config,
625 colors_config,
626 chrome_config,
627 extensions,
628 );
629 self.notify_event(VIEW_PROCESS_INITED_EVENT.new_update(args), observer);
630 }
631 zng_view_api::Event::Suspended => {
632 VIEW_PROCESS.handle_suspended();
633 let args = crate::view_process::ViewProcessSuspendedArgs::now();
634 self.notify_event(VIEW_PROCESS_SUSPENDED_EVENT.new_update(args), observer);
635 APP_PROCESS_SV.read().is_suspended.set(true);
636 }
637 zng_view_api::Event::Disconnected(vp_gen) => {
638 VIEW_PROCESS.handle_disconnect(vp_gen);
640 }
641 ev => {
642 if let Some(last) = self.pending_view_events.last_mut() {
643 match last.coalesce(ev) {
644 Ok(()) => {}
645 Err(ev) => self.pending_view_events.push(ev),
646 }
647 } else {
648 self.pending_view_events.push(ev);
649 }
650 }
651 },
652 AppEvent::Event(ev) => EVENTS.notify(ev.get()),
653 AppEvent::Update(op, target) => {
654 UPDATES.update_op(op, target);
655 }
656 AppEvent::CheckUpdate => {}
657 AppEvent::ResumeUnwind(p) => std::panic::resume_unwind(p),
658 }
659 }
660
661 fn has_pending_updates(&mut self) -> bool {
662 !self.pending_view_events.is_empty() || self.pending.has_updates() || UPDATES.has_pending_updates() || !self.receiver.is_empty()
663 }
664
665 pub(crate) fn poll<O: AppEventObserver>(&mut self, wait_app_event: bool, observer: &mut O) -> AppControlFlow {
666 #[cfg(feature = "dyn_app_extension")]
667 let mut observer = observer.as_dyn();
668 #[cfg(feature = "dyn_app_extension")]
669 let observer = &mut observer;
670 self.poll_impl(wait_app_event, observer)
671 }
672 fn poll_impl<O: AppEventObserver>(&mut self, wait_app_event: bool, observer: &mut O) -> AppControlFlow {
673 let mut disconnected = false;
674
675 if self.exited {
676 return AppControlFlow::Exit;
677 }
678
679 if wait_app_event {
680 let idle = tracing::debug_span!("<idle>", ended_by = tracing::field::Empty).entered();
681
682 let timer = if self.view_is_busy() { None } else { self.loop_timer.poll() };
683 if let Some(time) = timer {
684 match self.receiver.recv_deadline_sp(time) {
685 Ok(ev) => {
686 idle.record("ended_by", "event");
687 drop(idle);
688 self.push_coalesce(ev, observer)
689 }
690 Err(e) => match e {
691 flume::RecvTimeoutError::Timeout => {
692 idle.record("ended_by", "timeout");
693 }
694 flume::RecvTimeoutError::Disconnected => {
695 idle.record("ended_by", "disconnected");
696 disconnected = true
697 }
698 },
699 }
700 } else {
701 match self.receiver.recv() {
702 Ok(ev) => {
703 idle.record("ended_by", "event");
704 drop(idle);
705 self.push_coalesce(ev, observer)
706 }
707 Err(e) => match e {
708 flume::RecvError::Disconnected => {
709 idle.record("ended_by", "disconnected");
710 disconnected = true
711 }
712 },
713 }
714 }
715 }
716 loop {
717 match self.receiver.try_recv() {
718 Ok(ev) => self.push_coalesce(ev, observer),
719 Err(e) => match e {
720 flume::TryRecvError::Empty => break,
721 flume::TryRecvError::Disconnected => {
722 disconnected = true;
723 break;
724 }
725 },
726 }
727 }
728 if disconnected {
729 panic!("app events channel disconnected");
730 }
731
732 if self.view_is_busy() {
733 return AppControlFlow::Wait;
734 }
735
736 UPDATES.on_app_awake();
737
738 let updated_timers = self.loop_timer.awake();
740 if updated_timers {
741 UPDATES.update_timers(&mut self.loop_timer);
743 self.apply_updates(observer);
744 }
745
746 let mut events = mem::take(&mut self.pending_view_events);
747 for ev in events.drain(..) {
748 self.on_view_event(ev, observer);
749 self.apply_updates(observer);
750 }
751 debug_assert!(self.pending_view_events.is_empty());
752 self.pending_view_events = events; let mut events = mem::take(&mut self.pending_view_frame_events);
755 for ev in events.drain(..) {
756 self.on_view_rendered_event(ev, observer);
757 }
758 self.pending_view_frame_events = events;
759
760 if self.has_pending_updates() {
761 self.apply_updates(observer);
762 self.apply_update_events(observer);
763 }
764
765 if self.view_is_busy() {
766 return AppControlFlow::Wait;
767 }
768
769 self.finish_frame(observer);
770
771 UPDATES.next_deadline(&mut self.loop_timer);
772
773 if self.extensions.0.exit() {
774 UPDATES.on_app_sleep();
775 self.exited = true;
776 AppControlFlow::Exit
777 } else if self.has_pending_updates() || UPDATES.has_pending_layout_or_render() {
778 AppControlFlow::Poll
779 } else {
780 UPDATES.on_app_sleep();
781 AppControlFlow::Wait
782 }
783 }
784
785 fn apply_updates<O: AppEventObserver>(&mut self, observer: &mut O) {
787 let _s = tracing::debug_span!("apply_updates").entered();
788
789 let mut run = true;
790 while run {
791 run = self.loop_monitor.update(|| {
792 let mut any = false;
793
794 self.pending |= UPDATES.apply_info();
795 if mem::take(&mut self.pending.info) {
796 any = true;
797 let _s = tracing::debug_span!("info").entered();
798
799 let mut info_widgets = mem::take(&mut self.pending.info_widgets);
800
801 let _t = INSTANT_APP.pause_for_update();
802
803 {
804 let _s = tracing::debug_span!("ext.info").entered();
805 self.extensions.info(&mut info_widgets);
806 }
807 {
808 let _s = tracing::debug_span!("obs.info").entered();
809 observer.info(&mut info_widgets);
810 }
811 }
812
813 self.pending |= UPDATES.apply_updates();
814 TimersService::notify();
815 if mem::take(&mut self.pending.update) {
816 any = true;
817 let _s = tracing::debug_span!("update").entered();
818
819 let mut update_widgets = mem::take(&mut self.pending.update_widgets);
820
821 let _t = INSTANT_APP.pause_for_update();
822
823 {
824 let _s = tracing::debug_span!("ext.update_preview").entered();
825 self.extensions.update_preview();
826 }
827 {
828 let _s = tracing::debug_span!("obs.update_preview").entered();
829 observer.update_preview();
830 }
831 UPDATES.on_pre_updates();
832
833 {
834 let _s = tracing::debug_span!("ext.update_ui").entered();
835 self.extensions.update_ui(&mut update_widgets);
836 }
837 {
838 let _s = tracing::debug_span!("obs.update_ui").entered();
839 observer.update_ui(&mut update_widgets);
840 }
841
842 {
843 let _s = tracing::debug_span!("ext.update").entered();
844 self.extensions.update();
845 }
846 {
847 let _s = tracing::debug_span!("obs.update").entered();
848 observer.update();
849 }
850 UPDATES.on_updates();
851 }
852
853 any
854 });
855 }
856 }
857
858 fn apply_update_events<O: AppEventObserver>(&mut self, observer: &mut O) {
860 let _s = tracing::debug_span!("apply_update_events").entered();
861
862 loop {
863 let events: Vec<_> = self.pending.events.drain(..).collect();
864 if events.is_empty() {
865 break;
866 }
867 for mut update in events {
868 let _s = tracing::debug_span!("update_event", ?update).entered();
869
870 self.loop_monitor.maybe_trace(|| {
871 let _t = INSTANT_APP.pause_for_update();
872
873 {
874 let _s = tracing::debug_span!("ext.event_preview").entered();
875 self.extensions.event_preview(&mut update);
876 }
877 {
878 let _s = tracing::debug_span!("obs.event_preview").entered();
879 observer.event_preview(&mut update);
880 }
881 update.call_pre_actions();
882
883 {
884 let _s = tracing::debug_span!("ext.event_ui").entered();
885 self.extensions.event_ui(&mut update);
886 }
887 {
888 let _s = tracing::debug_span!("obs.event_ui").entered();
889 observer.event_ui(&mut update);
890 }
891 {
892 let _s = tracing::debug_span!("ext.event").entered();
893 self.extensions.event(&mut update);
894 }
895 {
896 let _s = tracing::debug_span!("obs.event").entered();
897 observer.event(&mut update);
898 }
899 update.call_pos_actions();
900 });
901
902 self.apply_updates(observer);
903 }
904 }
905 }
906
907 fn view_is_busy(&mut self) -> bool {
908 VIEW_PROCESS.is_available() && VIEW_PROCESS.pending_frames() > 0
909 }
910
911 fn finish_frame<O: AppEventObserver>(&mut self, observer: &mut O) {
913 debug_assert!(!self.view_is_busy());
914
915 self.pending |= UPDATES.apply_layout_render();
916
917 while mem::take(&mut self.pending.layout) {
918 let _s = tracing::debug_span!("apply_layout").entered();
919
920 let mut layout_widgets = mem::take(&mut self.pending.layout_widgets);
921
922 self.loop_monitor.maybe_trace(|| {
923 let _t = INSTANT_APP.pause_for_update();
924
925 {
926 let _s = tracing::debug_span!("ext.layout").entered();
927 self.extensions.layout(&mut layout_widgets);
928 }
929 {
930 let _s = tracing::debug_span!("obs.layout").entered();
931 observer.layout(&mut layout_widgets);
932 }
933 });
934
935 self.apply_updates(observer);
936 self.pending |= UPDATES.apply_layout_render();
937 }
938
939 if mem::take(&mut self.pending.render) {
940 let _s = tracing::debug_span!("apply_render").entered();
941
942 let mut render_widgets = mem::take(&mut self.pending.render_widgets);
943 let mut render_update_widgets = mem::take(&mut self.pending.render_update_widgets);
944
945 let _t = INSTANT_APP.pause_for_update();
946
947 {
948 let _s = tracing::debug_span!("ext.render").entered();
949 self.extensions.render(&mut render_widgets, &mut render_update_widgets);
950 }
951 {
952 let _s = tracing::debug_span!("obs.render").entered();
953 observer.render(&mut render_widgets, &mut render_update_widgets);
954 }
955 }
956
957 self.loop_monitor.finish_frame();
958 }
959}
960impl<E: AppExtension> Drop for RunningApp<E> {
961 fn drop(&mut self) {
962 let _s = tracing::debug_span!("ext.deinit").entered();
963 self.extensions.deinit();
964 VIEW_PROCESS.exit();
965 }
966}
967
968pub struct AppStartArgs {
973 _private: (),
974}
975
976pub fn on_app_start(handler: impl FnMut(&AppStartArgs) + Send + 'static) {
982 zng_unique_id::hot_static_ref!(ON_APP_START).lock().push(Box::new(handler))
983}
984zng_unique_id::hot_static! {
985 static ON_APP_START: Mutex<Vec<AppStartHandler>> = Mutex::new(vec![]);
986}
987type AppStartHandler = Box<dyn FnMut(&AppStartArgs) + Send + 'static>;
988
989#[derive(Debug)]
991pub(crate) struct LoopTimer {
992 now: DInstant,
993 deadline: Option<Deadline>,
994}
995impl Default for LoopTimer {
996 fn default() -> Self {
997 Self {
998 now: INSTANT.now(),
999 deadline: None,
1000 }
1001 }
1002}
1003impl LoopTimer {
1004 pub fn elapsed(&mut self, deadline: Deadline) -> bool {
1007 if deadline.0 <= self.now {
1008 true
1009 } else {
1010 self.register(deadline);
1011 false
1012 }
1013 }
1014
1015 pub fn register(&mut self, deadline: Deadline) {
1017 if let Some(d) = &mut self.deadline {
1018 if deadline < *d {
1019 *d = deadline;
1020 }
1021 } else {
1022 self.deadline = Some(deadline)
1023 }
1024 }
1025
1026 pub(crate) fn poll(&mut self) -> Option<Deadline> {
1028 self.deadline
1029 }
1030
1031 pub(crate) fn awake(&mut self) -> bool {
1033 self.now = INSTANT.now();
1034 if let Some(d) = self.deadline {
1035 if d.0 <= self.now {
1036 self.deadline = None;
1037 return true;
1038 }
1039 }
1040 false
1041 }
1042
1043 pub fn now(&self) -> DInstant {
1045 self.now
1046 }
1047}
1048impl zng_var::animation::AnimationTimer for LoopTimer {
1049 fn elapsed(&mut self, deadline: Deadline) -> bool {
1050 self.elapsed(deadline)
1051 }
1052
1053 fn register(&mut self, deadline: Deadline) {
1054 self.register(deadline)
1055 }
1056
1057 fn now(&self) -> DInstant {
1058 self.now()
1059 }
1060}
1061
1062#[derive(Default)]
1063struct LoopMonitor {
1064 update_count: u16,
1065 skipped: bool,
1066 trace: Vec<UpdateTrace>,
1067}
1068impl LoopMonitor {
1069 pub fn update(&mut self, update_once: impl FnOnce() -> bool) -> bool {
1071 self.update_count += 1;
1072
1073 if self.update_count < 500 {
1074 update_once()
1075 } else if self.update_count < 1000 {
1076 UpdatesTrace::collect_trace(&mut self.trace, update_once)
1077 } else if self.update_count == 1000 {
1078 self.skipped = true;
1079 let trace = UpdatesTrace::format_trace(mem::take(&mut self.trace));
1080 tracing::error!(
1081 "updated 1000 times without rendering, probably stuck in an infinite loop\n\
1082 will start skipping updates to render and poll system events\n\
1083 top 20 most frequent update requests (in 500 cycles):\n\
1084 {trace}\n\
1085 you can use `UpdatesTraceUiNodeExt` and `updates_trace_event` to refine the trace"
1086 );
1087 false
1088 } else if self.update_count == 1500 {
1089 self.update_count = 1001;
1090 false
1091 } else {
1092 update_once()
1093 }
1094 }
1095
1096 pub fn maybe_trace(&mut self, notify_once: impl FnOnce()) {
1097 if (500..1000).contains(&self.update_count) {
1098 UpdatesTrace::collect_trace(&mut self.trace, notify_once);
1099 } else {
1100 notify_once();
1101 }
1102 }
1103
1104 pub fn finish_frame(&mut self) {
1105 if !self.skipped {
1106 self.skipped = false;
1107 self.update_count = 0;
1108 self.trace = vec![];
1109 }
1110 }
1111}
1112
1113impl APP {
1114 pub fn exit(&self) -> ResponseVar<ExitCancelled> {
1123 APP_PROCESS_SV.write().exit()
1124 }
1125
1126 pub fn is_suspended(&self) -> ReadOnlyArcVar<bool> {
1134 APP_PROCESS_SV.read().is_suspended.read_only()
1135 }
1136}
1137
1138impl APP {
1142 pub fn pause_time_for_update(&self) -> ArcVar<bool> {
1148 APP_PROCESS_SV.read().pause_time_for_updates.clone()
1149 }
1150
1151 pub fn start_manual_time(&self) {
1159 INSTANT_APP.set_mode(InstantMode::Manual);
1160 INSTANT_APP.set_now(INSTANT.now());
1161 UPDATES.update(None);
1162 }
1163
1164 pub fn advance_manual_time(&self, advance: Duration) {
1175 INSTANT_APP.advance_now(advance);
1176 UPDATES.update(None);
1177 }
1178
1179 pub fn set_manual_time(&self, now: DInstant) {
1188 INSTANT_APP.set_now(now);
1189 UPDATES.update(None);
1190 }
1191
1192 pub fn end_manual_time(&self) {
1194 INSTANT_APP.set_mode(match APP.pause_time_for_update().get() {
1195 true => InstantMode::UpdatePaused,
1196 false => InstantMode::Now,
1197 });
1198 UPDATES.update(None);
1199 }
1200}
1201
1202command! {
1203 pub static EXIT_CMD = {
1207 l10n!: true,
1208 name: "Exit",
1209 info: "Close all windows and exit",
1210 shortcut: shortcut!(Exit),
1211 };
1212}
1213
1214#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1218pub struct ExitCancelled;
1219impl fmt::Display for ExitCancelled {
1220 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1221 write!(f, "exit request cancelled")
1222 }
1223}
1224
1225struct AppIntrinsic {
1226 exit_handle: CommandHandle,
1227 pending_exit: Option<PendingExit>,
1228}
1229struct PendingExit {
1230 handle: EventPropagationHandle,
1231 response: ResponderVar<ExitCancelled>,
1232}
1233impl AppIntrinsic {
1234 pub(super) fn pre_init(
1236 is_headed: bool,
1237 with_renderer: bool,
1238 view_process_exe: PathBuf,
1239 view_process_env: HashMap<Txt, Txt>,
1240 device_events: bool,
1241 ) -> Self {
1242 APP_PROCESS_SV
1243 .read()
1244 .pause_time_for_updates
1245 .hook(|a| {
1246 if !matches!(INSTANT.mode(), zng_time::InstantMode::Manual) {
1247 if *a.value() {
1248 INSTANT_APP.set_mode(InstantMode::UpdatePaused);
1249 } else {
1250 INSTANT_APP.set_mode(InstantMode::Now);
1251 }
1252 }
1253 true
1254 })
1255 .perm();
1256
1257 if is_headed {
1258 debug_assert!(with_renderer);
1259
1260 let view_evs_sender = UPDATES.sender();
1261 VIEW_PROCESS.start(view_process_exe, view_process_env, device_events, false, move |ev| {
1262 let _ = view_evs_sender.send_view_event(ev);
1263 });
1264 } else if with_renderer {
1265 let view_evs_sender = UPDATES.sender();
1266 VIEW_PROCESS.start(view_process_exe, view_process_env, false, true, move |ev| {
1267 let _ = view_evs_sender.send_view_event(ev);
1268 });
1269 }
1270
1271 AppIntrinsic {
1272 exit_handle: EXIT_CMD.subscribe(true),
1273 pending_exit: None,
1274 }
1275 }
1276
1277 pub(super) fn exit(&mut self) -> bool {
1279 if let Some(pending) = self.pending_exit.take() {
1280 if pending.handle.is_stopped() {
1281 pending.response.respond(ExitCancelled);
1282 false
1283 } else {
1284 true
1285 }
1286 } else {
1287 false
1288 }
1289 }
1290}
1291impl AppExtension for AppIntrinsic {
1292 fn event_preview(&mut self, update: &mut EventUpdate) {
1293 if let Some(args) = EXIT_CMD.on(update) {
1294 args.handle_enabled(&self.exit_handle, |_| {
1295 APP.exit();
1296 });
1297 }
1298 }
1299
1300 fn update(&mut self) {
1301 if let Some(response) = APP_PROCESS_SV.write().take_requests() {
1302 let args = ExitRequestedArgs::now();
1303 self.pending_exit = Some(PendingExit {
1304 handle: args.propagation().clone(),
1305 response,
1306 });
1307 EXIT_REQUESTED_EVENT.notify(args);
1308 }
1309 }
1310}
1311
1312pub(crate) fn assert_not_view_process() {
1313 if zng_view_api::ViewConfig::from_env().is_some() {
1314 panic!("cannot start App in view-process");
1315 }
1316}
1317
1318#[cfg(feature = "deadlock_detection")]
1319pub(crate) fn check_deadlock() {
1320 use parking_lot::deadlock;
1321 use std::{
1322 sync::atomic::{self, AtomicBool},
1323 thread,
1324 time::*,
1325 };
1326
1327 static CHECK_RUNNING: AtomicBool = AtomicBool::new(false);
1328
1329 if CHECK_RUNNING.swap(true, atomic::Ordering::SeqCst) {
1330 return;
1331 }
1332
1333 thread::spawn(|| {
1334 loop {
1335 thread::sleep(Duration::from_secs(10));
1336
1337 let deadlocks = deadlock::check_deadlock();
1338 if deadlocks.is_empty() {
1339 continue;
1340 }
1341
1342 use std::fmt::Write;
1343 let mut msg = String::new();
1344
1345 let _ = writeln!(&mut msg, "{} deadlocks detected", deadlocks.len());
1346 for (i, threads) in deadlocks.iter().enumerate() {
1347 let _ = writeln!(&mut msg, "Deadlock #{}, {} threads", i, threads.len());
1348 for t in threads {
1349 let _ = writeln!(&mut msg, "Thread Id {:#?}", t.thread_id());
1350 let _ = writeln!(&mut msg, "{:#?}", t.backtrace());
1351 }
1352 }
1353
1354 #[cfg(not(feature = "test_util"))]
1355 eprint!("{msg}");
1356
1357 #[cfg(feature = "test_util")]
1358 {
1359 use std::io::Write;
1362 let _ = write!(&mut std::io::stderr(), "{msg}");
1363 zng_env::exit(-1);
1364 }
1365 }
1366 });
1367}
1368#[cfg(not(feature = "deadlock_detection"))]
1369pub(crate) fn check_deadlock() {}
1370
1371app_local! {
1372 pub(super) static APP_PROCESS_SV: AppProcessService = AppProcessService {
1373 exit_requests: None,
1374 extensions: None,
1375 device_events: false,
1376 pause_time_for_updates: zng_var::var(true),
1377 is_suspended: zng_var::var(false),
1378 };
1379}
1380
1381pub(super) struct AppProcessService {
1382 exit_requests: Option<ResponderVar<ExitCancelled>>,
1383 extensions: Option<Arc<AppExtensionsInfo>>,
1384 pub(super) device_events: bool,
1385 pause_time_for_updates: ArcVar<bool>,
1386 is_suspended: ArcVar<bool>,
1387}
1388impl AppProcessService {
1389 pub(super) fn take_requests(&mut self) -> Option<ResponderVar<ExitCancelled>> {
1390 self.exit_requests.take()
1391 }
1392
1393 fn exit(&mut self) -> ResponseVar<ExitCancelled> {
1394 if let Some(r) = &self.exit_requests {
1395 r.response_var()
1396 } else {
1397 let (responder, response) = response_var();
1398 self.exit_requests = Some(responder);
1399 UPDATES.update(None);
1400 response
1401 }
1402 }
1403
1404 pub(super) fn extensions(&self) -> Arc<AppExtensionsInfo> {
1405 self.extensions
1406 .clone()
1407 .unwrap_or_else(|| Arc::new(AppExtensionsInfo { infos: vec![] }))
1408 }
1409
1410 pub(super) fn set_extensions(&mut self, info: AppExtensionsInfo, device_events: bool) {
1411 self.extensions = Some(Arc::new(info));
1412 self.device_events = device_events;
1413 }
1414}
1415
1416#[derive(Debug)]
1418#[allow(clippy::large_enum_variant)] pub(crate) enum AppEvent {
1420 ViewEvent(zng_view_api::Event),
1422 Event(crate::event::EventUpdateMsg),
1424 Update(UpdateOp, Option<WidgetId>),
1426 ResumeUnwind(PanicPayload),
1428 CheckUpdate,
1430}
1431
1432#[derive(Clone)]
1438pub struct AppEventSender(flume::Sender<AppEvent>);
1439impl AppEventSender {
1440 pub(crate) fn new() -> (Self, flume::Receiver<AppEvent>) {
1441 let (sender, receiver) = flume::unbounded();
1442 (Self(sender), receiver)
1443 }
1444
1445 #[allow(clippy::result_large_err)] fn send_app_event(&self, event: AppEvent) -> Result<(), AppChannelError> {
1447 self.0.send(event).map_err(|_| AppChannelError::Disconnected)
1448 }
1449
1450 #[allow(clippy::result_large_err)]
1451 fn send_view_event(&self, event: zng_view_api::Event) -> Result<(), AppChannelError> {
1452 self.0.send(AppEvent::ViewEvent(event)).map_err(|_| AppChannelError::Disconnected)
1453 }
1454
1455 pub fn send_update(&self, op: UpdateOp, target: impl Into<Option<WidgetId>>) -> Result<(), AppChannelError> {
1457 UpdatesTrace::log_update();
1458 self.send_app_event(AppEvent::Update(op, target.into()))
1459 .map_err(|_| AppChannelError::Disconnected)
1460 }
1461
1462 pub(crate) fn send_event(&self, event: crate::event::EventUpdateMsg) -> Result<(), AppChannelError> {
1464 self.send_app_event(AppEvent::Event(event))
1465 .map_err(|_| AppChannelError::Disconnected)
1466 }
1467
1468 pub fn send_resume_unwind(&self, payload: PanicPayload) -> Result<(), AppChannelError> {
1470 self.send_app_event(AppEvent::ResumeUnwind(payload))
1471 .map_err(|_| AppChannelError::Disconnected)
1472 }
1473
1474 pub(crate) fn send_check_update(&self) -> Result<(), AppChannelError> {
1476 self.send_app_event(AppEvent::CheckUpdate)
1477 .map_err(|_| AppChannelError::Disconnected)
1478 }
1479
1480 pub fn waker(&self, target: impl Into<Option<WidgetId>>) -> Waker {
1482 Arc::new(AppWaker(self.0.clone(), target.into())).into()
1483 }
1484
1485 pub fn ext_channel<T>(&self) -> (AppExtSender<T>, AppExtReceiver<T>) {
1487 let (sender, receiver) = flume::unbounded();
1488
1489 (
1490 AppExtSender {
1491 update: self.clone(),
1492 sender,
1493 },
1494 AppExtReceiver { receiver },
1495 )
1496 }
1497
1498 pub fn ext_channel_bounded<T>(&self, cap: usize) -> (AppExtSender<T>, AppExtReceiver<T>) {
1500 let (sender, receiver) = flume::bounded(cap);
1501
1502 (
1503 AppExtSender {
1504 update: self.clone(),
1505 sender,
1506 },
1507 AppExtReceiver { receiver },
1508 )
1509 }
1510}
1511
1512struct AppWaker(flume::Sender<AppEvent>, Option<WidgetId>);
1513impl std::task::Wake for AppWaker {
1514 fn wake(self: std::sync::Arc<Self>) {
1515 self.wake_by_ref()
1516 }
1517 fn wake_by_ref(self: &Arc<Self>) {
1518 let _ = self.0.send(AppEvent::Update(UpdateOp::Update, self.1));
1519 }
1520}
1521
1522type PanicPayload = Box<dyn std::any::Any + Send + 'static>;
1523
1524pub struct AppExtSender<T> {
1528 update: AppEventSender,
1529 sender: flume::Sender<T>,
1530}
1531impl<T> Clone for AppExtSender<T> {
1532 fn clone(&self) -> Self {
1533 Self {
1534 update: self.update.clone(),
1535 sender: self.sender.clone(),
1536 }
1537 }
1538}
1539impl<T: Send> AppExtSender<T> {
1540 pub fn send(&self, msg: T) -> Result<(), AppChannelError> {
1542 match self.update.send_update(UpdateOp::Update, None) {
1543 Ok(()) => self.sender.send(msg).map_err(|_| AppChannelError::Disconnected),
1544 Err(_) => Err(AppChannelError::Disconnected),
1545 }
1546 }
1547
1548 pub fn send_timeout(&self, msg: T, dur: Duration) -> Result<(), AppChannelError> {
1550 match self.update.send_update(UpdateOp::Update, None) {
1551 Ok(()) => self.sender.send_timeout(msg, dur).map_err(|e| match e {
1552 flume::SendTimeoutError::Timeout(_) => AppChannelError::Timeout,
1553 flume::SendTimeoutError::Disconnected(_) => AppChannelError::Disconnected,
1554 }),
1555 Err(_) => Err(AppChannelError::Disconnected),
1556 }
1557 }
1558
1559 pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), AppChannelError> {
1561 match self.update.send_update(UpdateOp::Update, None) {
1562 Ok(()) => self.sender.send_deadline(msg, deadline).map_err(|e| match e {
1563 flume::SendTimeoutError::Timeout(_) => AppChannelError::Timeout,
1564 flume::SendTimeoutError::Disconnected(_) => AppChannelError::Disconnected,
1565 }),
1566 Err(_) => Err(AppChannelError::Disconnected),
1567 }
1568 }
1569}
1570
1571pub struct AppExtReceiver<T> {
1575 receiver: flume::Receiver<T>,
1576}
1577impl<T> Clone for AppExtReceiver<T> {
1578 fn clone(&self) -> Self {
1579 Self {
1580 receiver: self.receiver.clone(),
1581 }
1582 }
1583}
1584impl<T> AppExtReceiver<T> {
1585 pub fn try_recv(&self) -> Result<T, Option<AppChannelError>> {
1590 self.receiver.try_recv().map_err(|e| match e {
1591 flume::TryRecvError::Empty => None,
1592 flume::TryRecvError::Disconnected => Some(AppChannelError::Disconnected),
1593 })
1594 }
1595}
1596
1597#[derive(Debug, Clone)]
1599#[non_exhaustive]
1600pub enum AppChannelError {
1601 Disconnected,
1603 Timeout,
1605}
1606impl fmt::Display for AppChannelError {
1607 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1608 match self {
1609 AppChannelError::Disconnected => write!(f, "cannot receive because the sender disconnected"),
1610 AppChannelError::Timeout => write!(f, "deadline elapsed before message could be send/received"),
1611 }
1612 }
1613}
1614impl std::error::Error for AppChannelError {}
1615impl From<flume::RecvTimeoutError> for AppChannelError {
1616 fn from(value: flume::RecvTimeoutError) -> Self {
1617 match value {
1618 flume::RecvTimeoutError::Timeout => AppChannelError::Timeout,
1619 flume::RecvTimeoutError::Disconnected => AppChannelError::Disconnected,
1620 }
1621 }
1622}
1623
1624event_args! {
1625 pub struct ExitRequestedArgs {
1629
1630 ..
1631
1632 fn delivery_list(&self, list: &mut UpdateDeliveryList) {
1634 list.search_all()
1635 }
1636 }
1637}
1638
1639event! {
1640 pub static EXIT_REQUESTED_EVENT: ExitRequestedArgs;
1648}
1649
1650trait ReceiverExt<T> {
1652 fn recv_deadline_sp(&self, deadline: Deadline) -> Result<T, flume::RecvTimeoutError>;
1654}
1655
1656const WORST_SLEEP_ERR: Duration = Duration::from_millis(if cfg!(windows) { 20 } else { 10 });
1657const WORST_SPIN_ERR: Duration = Duration::from_millis(if cfg!(windows) { 2 } else { 1 });
1658
1659impl<T> ReceiverExt<T> for flume::Receiver<T> {
1660 fn recv_deadline_sp(&self, deadline: Deadline) -> Result<T, flume::RecvTimeoutError> {
1661 loop {
1662 if let Some(d) = deadline.0.checked_duration_since(INSTANT.now()) {
1663 if matches!(INSTANT.mode(), zng_time::InstantMode::Manual) {
1664 match self.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
1667 Err(flume::RecvTimeoutError::Timeout) => continue, interrupt => return interrupt,
1669 }
1670 } else if d > WORST_SLEEP_ERR {
1671 #[cfg(not(target_arch = "wasm32"))]
1673 match self.recv_deadline(deadline.0.checked_sub(WORST_SLEEP_ERR).unwrap().into()) {
1674 Err(flume::RecvTimeoutError::Timeout) => continue, interrupt => return interrupt,
1676 }
1677
1678 #[cfg(target_arch = "wasm32")] match self.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
1680 Err(flume::RecvTimeoutError::Timeout) => continue, interrupt => return interrupt,
1682 }
1683 } else if d > WORST_SPIN_ERR {
1684 let spin_deadline = Deadline(deadline.0.checked_sub(WORST_SPIN_ERR).unwrap());
1685
1686 while !spin_deadline.has_elapsed() {
1688 match self.try_recv() {
1689 Err(flume::TryRecvError::Empty) => std::thread::yield_now(),
1690 Err(flume::TryRecvError::Disconnected) => return Err(flume::RecvTimeoutError::Disconnected),
1691 Ok(msg) => return Ok(msg),
1692 }
1693 }
1694 continue; } else {
1696 while !deadline.has_elapsed() {
1698 std::thread::yield_now();
1699 }
1700 return Err(flume::RecvTimeoutError::Timeout);
1701 }
1702 } else {
1703 return Err(flume::RecvTimeoutError::Timeout);
1704 }
1705 }
1706 }
1707}