1use std::{
2 collections::HashMap,
3 fmt, mem,
4 path::PathBuf,
5 sync::Arc,
6 task::Waker,
7 time::{Duration, Instant},
8};
9
10use crate::Deadline;
11use parking_lot::Mutex;
12use zng_app_context::{AppScope, app_local};
13use zng_task::DEADLINE_APP;
14use zng_time::{INSTANT_APP, InstantMode};
15use zng_txt::Txt;
16use zng_var::{ResponderVar, ResponseVar, VARS_APP, Var, response_var};
17use zng_view_api::{DeviceEventsFilter, raw_input::InputDeviceEvent};
18
19use crate::{
20 APP, AppControlFlow, AppEventObserver, AppExtension, AppExtensionsInfo, DInstant, INSTANT,
21 event::{AnyEventArgs, CommandHandle, CommandInfoExt, CommandNameExt, EVENTS, EventPropagationHandle, command, event},
22 event_args,
23 shortcut::CommandShortcutExt,
24 shortcut::shortcut,
25 timer::TimersService,
26 update::{
27 ContextUpdates, EventUpdate, InfoUpdates, LayoutUpdates, RenderUpdates, UPDATES, UpdateOp, UpdateTrace, UpdatesTrace, WidgetUpdates,
28 },
29 view_process::{raw_device_events::InputDeviceId, *},
30 widget::WidgetId,
31 window::WindowId,
32};
33
34pub(crate) struct RunningApp<E: AppExtension> {
36 extensions: (AppIntrinsic, E),
37
38 receiver: flume::Receiver<AppEvent>,
39
40 loop_timer: LoopTimer,
41 loop_monitor: LoopMonitor,
42
43 pending_view_events: Vec<zng_view_api::Event>,
44 pending_view_frame_events: Vec<zng_view_api::window::EventFrameRendered>,
45 pending: ContextUpdates,
46
47 exited: bool,
48
49 _scope: AppScope,
51}
52impl<E: AppExtension> RunningApp<E> {
53 pub(crate) fn start(
54 scope: AppScope,
55 mut extensions: E,
56 is_headed: bool,
57 with_renderer: bool,
58 view_process_exe: Option<PathBuf>,
59 view_process_env: HashMap<Txt, Txt>,
60 ) -> Self {
61 let _s = tracing::debug_span!("APP::start").entered();
62
63 let (sender, receiver) = AppEventSender::new();
64
65 UPDATES.init(sender);
66
67 fn app_waker() {
68 UPDATES.update(None);
69 }
70 VARS_APP.init_app_waker(app_waker);
71 VARS_APP.init_modify_trace(UpdatesTrace::log_var);
72 DEADLINE_APP.init_deadline_service(crate::timer::deadline_service);
73 zng_var::animation::TRANSITIONABLE_APP.init_rgba_lerp(zng_color::lerp_rgba);
74
75 let mut info = AppExtensionsInfo::start();
76 {
77 let _t = INSTANT_APP.pause_for_update();
78 extensions.register(&mut info);
79 }
80
81 {
82 let mut sv = APP_PROCESS_SV.write();
83 sv.set_extensions(info);
84 }
85
86 if with_renderer && view_process_exe.is_none() {
87 zng_env::assert_inited();
88 }
89
90 #[cfg(not(target_arch = "wasm32"))]
91 let view_process_exe = view_process_exe.unwrap_or_else(|| std::env::current_exe().expect("current_exe"));
92 #[cfg(target_arch = "wasm32")]
93 let view_process_exe = std::path::PathBuf::from("<wasm>");
94
95 let process = AppIntrinsic::pre_init(is_headed, with_renderer, view_process_exe, view_process_env);
96
97 {
98 let _s = tracing::debug_span!("extensions.init").entered();
99 extensions.init();
100 }
101
102 let args = AppStartArgs { _private: () };
103 for h in zng_unique_id::hot_static_ref!(ON_APP_START).lock().iter_mut() {
104 h(&args)
105 }
106
107 RunningApp {
108 extensions: (process, extensions),
109
110 receiver,
111
112 loop_timer: LoopTimer::default(),
113 loop_monitor: LoopMonitor::default(),
114
115 pending_view_events: Vec::with_capacity(100),
116 pending_view_frame_events: Vec::with_capacity(5),
117 pending: ContextUpdates {
118 events: Vec::with_capacity(100),
119 update: false,
120 info: false,
121 layout: false,
122 render: false,
123 update_widgets: WidgetUpdates::default(),
124 info_widgets: InfoUpdates::default(),
125 layout_widgets: LayoutUpdates::default(),
126 render_widgets: RenderUpdates::default(),
127 render_update_widgets: RenderUpdates::default(),
128 },
129 exited: false,
130
131 _scope: scope,
132 }
133 }
134
135 pub fn has_exited(&self) -> bool {
136 self.exited
137 }
138
139 pub fn notify_event<O: AppEventObserver>(&mut self, mut update: EventUpdate, observer: &mut O) {
141 let _scope = tracing::trace_span!("notify_event", event = update.event().name()).entered();
142
143 let _t = INSTANT_APP.pause_for_update();
144
145 update.event().on_update(&mut update);
146
147 self.extensions.event_preview(&mut update);
148 observer.event_preview(&mut update);
149 update.call_pre_actions();
150
151 self.extensions.event_ui(&mut update);
152 observer.event_ui(&mut update);
153
154 self.extensions.event(&mut update);
155 observer.event(&mut update);
156 update.call_pos_actions();
157 }
158
159 fn input_device_id(&mut self, id: zng_view_api::raw_input::InputDeviceId) -> InputDeviceId {
160 VIEW_PROCESS.input_device_id(id)
161 }
162
163 fn on_view_event<O: AppEventObserver>(&mut self, ev: zng_view_api::Event, observer: &mut O) {
165 use crate::view_process::raw_device_events::*;
166 use crate::view_process::raw_events::*;
167 use zng_view_api::Event;
168
169 fn window_id(id: zng_view_api::window::WindowId) -> WindowId {
170 WindowId::from_raw(id.get())
171 }
172
173 match ev {
174 Event::MouseMoved {
175 window: w_id,
176 device: d_id,
177 coalesced_pos,
178 position,
179 } => {
180 let args = RawMouseMovedArgs::now(window_id(w_id), self.input_device_id(d_id), coalesced_pos, position);
181 self.notify_event(RAW_MOUSE_MOVED_EVENT.new_update(args), observer);
182 }
183 Event::MouseEntered {
184 window: w_id,
185 device: d_id,
186 } => {
187 let args = RawMouseArgs::now(window_id(w_id), self.input_device_id(d_id));
188 self.notify_event(RAW_MOUSE_ENTERED_EVENT.new_update(args), observer);
189 }
190 Event::MouseLeft {
191 window: w_id,
192 device: d_id,
193 } => {
194 let args = RawMouseArgs::now(window_id(w_id), self.input_device_id(d_id));
195 self.notify_event(RAW_MOUSE_LEFT_EVENT.new_update(args), observer);
196 }
197 Event::WindowChanged(c) => {
198 let monitor_id = c.monitor.map(|id| VIEW_PROCESS.monitor_id(id));
199 let args = RawWindowChangedArgs::now(
200 window_id(c.window),
201 c.state,
202 c.position,
203 monitor_id,
204 c.size,
205 c.safe_padding,
206 c.cause,
207 c.frame_wait_id,
208 );
209 self.notify_event(RAW_WINDOW_CHANGED_EVENT.new_update(args), observer);
210 }
211 Event::DragHovered { window, data, allowed } => {
212 let args = RawDragHoveredArgs::now(window_id(window), data, allowed);
213 self.notify_event(RAW_DRAG_HOVERED_EVENT.new_update(args), observer);
214 }
215 Event::DragMoved {
216 window,
217 coalesced_pos,
218 position,
219 } => {
220 let args = RawDragMovedArgs::now(window_id(window), coalesced_pos, position);
221 self.notify_event(RAW_DRAG_MOVED_EVENT.new_update(args), observer);
222 }
223 Event::DragDropped {
224 window,
225 data,
226 allowed,
227 drop_id,
228 } => {
229 let args = RawDragDroppedArgs::now(window_id(window), data, allowed, drop_id);
230 self.notify_event(RAW_DRAG_DROPPED_EVENT.new_update(args), observer);
231 }
232 Event::DragCancelled { window } => {
233 let args = RawDragCancelledArgs::now(window_id(window));
234 self.notify_event(RAW_DRAG_CANCELLED_EVENT.new_update(args), observer);
235 }
236 Event::AppDragEnded { window, drag, applied } => {
237 let args = RawAppDragEndedArgs::now(window_id(window), drag, applied);
238 self.notify_event(RAW_APP_DRAG_ENDED_EVENT.new_update(args), observer);
239 }
240 Event::FocusChanged { prev, new } => {
241 let args = RawWindowFocusArgs::now(prev.map(window_id), new.map(window_id));
242 self.notify_event(RAW_WINDOW_FOCUS_EVENT.new_update(args), observer);
243 }
244 Event::KeyboardInput {
245 window: w_id,
246 device: d_id,
247 key_code,
248 state,
249 key,
250 key_location,
251 key_modified,
252 text,
253 } => {
254 let args = RawKeyInputArgs::now(
255 window_id(w_id),
256 self.input_device_id(d_id),
257 key_code,
258 key_location,
259 state,
260 key,
261 key_modified,
262 text,
263 );
264 self.notify_event(RAW_KEY_INPUT_EVENT.new_update(args), observer);
265 }
266 Event::Ime { window: w_id, ime } => {
267 let args = RawImeArgs::now(window_id(w_id), ime);
268 self.notify_event(RAW_IME_EVENT.new_update(args), observer);
269 }
270
271 Event::MouseWheel {
272 window: w_id,
273 device: d_id,
274 delta,
275 phase,
276 } => {
277 let args = RawMouseWheelArgs::now(window_id(w_id), self.input_device_id(d_id), delta, phase);
278 self.notify_event(RAW_MOUSE_WHEEL_EVENT.new_update(args), observer);
279 }
280 Event::MouseInput {
281 window: w_id,
282 device: d_id,
283 state,
284 button,
285 } => {
286 let args = RawMouseInputArgs::now(window_id(w_id), self.input_device_id(d_id), state, button);
287 self.notify_event(RAW_MOUSE_INPUT_EVENT.new_update(args), observer);
288 }
289 Event::TouchpadPressure {
290 window: w_id,
291 device: d_id,
292 pressure,
293 stage,
294 } => {
295 let args = RawTouchpadPressureArgs::now(window_id(w_id), self.input_device_id(d_id), pressure, stage);
296 self.notify_event(RAW_TOUCHPAD_PRESSURE_EVENT.new_update(args), observer);
297 }
298 Event::AxisMotion {
299 window: w_id,
300 device: d_id,
301 axis,
302 value,
303 } => {
304 let args = RawAxisMotionArgs::now(window_id(w_id), self.input_device_id(d_id), axis, value);
305 self.notify_event(RAW_AXIS_MOTION_EVENT.new_update(args), observer);
306 }
307 Event::Touch {
308 window: w_id,
309 device: d_id,
310 touches,
311 } => {
312 let args = RawTouchArgs::now(window_id(w_id), self.input_device_id(d_id), touches);
313 self.notify_event(RAW_TOUCH_EVENT.new_update(args), observer);
314 }
315 Event::ScaleFactorChanged {
316 monitor: id,
317 windows,
318 scale_factor,
319 } => {
320 let monitor_id = VIEW_PROCESS.monitor_id(id);
321 let windows: Vec<_> = windows.into_iter().map(window_id).collect();
322 let args = RawScaleFactorChangedArgs::now(monitor_id, windows, scale_factor);
323 self.notify_event(RAW_SCALE_FACTOR_CHANGED_EVENT.new_update(args), observer);
324 }
325 Event::MonitorsChanged(monitors) => {
326 let monitors: Vec<_> = monitors.into_iter().map(|(id, info)| (VIEW_PROCESS.monitor_id(id), info)).collect();
327 let args = RawMonitorsChangedArgs::now(monitors);
328 self.notify_event(RAW_MONITORS_CHANGED_EVENT.new_update(args), observer);
329 }
330 Event::AudioDevicesChanged(_audio_devices) => {
331 }
333 Event::WindowCloseRequested(w_id) => {
334 let args = RawWindowCloseRequestedArgs::now(window_id(w_id));
335 self.notify_event(RAW_WINDOW_CLOSE_REQUESTED_EVENT.new_update(args), observer);
336 }
337 Event::WindowOpened(w_id, data) => {
338 let w_id = window_id(w_id);
339 let (window, data) = VIEW_PROCESS.on_window_opened(w_id, data);
340 let args = RawWindowOpenArgs::now(w_id, window, data);
341 self.notify_event(RAW_WINDOW_OPEN_EVENT.new_update(args), observer);
342 }
343 Event::HeadlessOpened(w_id, data) => {
344 let w_id = window_id(w_id);
345 let (surface, data) = VIEW_PROCESS.on_headless_opened(w_id, data);
346 let args = RawHeadlessOpenArgs::now(w_id, surface, data);
347 self.notify_event(RAW_HEADLESS_OPEN_EVENT.new_update(args), observer);
348 }
349 Event::WindowOrHeadlessOpenError { id: w_id, error } => {
350 let w_id = window_id(w_id);
351 let args = RawWindowOrHeadlessOpenErrorArgs::now(w_id, error);
352 self.notify_event(RAW_WINDOW_OR_HEADLESS_OPEN_ERROR_EVENT.new_update(args), observer);
353 }
354 Event::WindowClosed(w_id) => {
355 let args = RawWindowCloseArgs::now(window_id(w_id));
356 self.notify_event(RAW_WINDOW_CLOSE_EVENT.new_update(args), observer);
357 }
358 Event::ImageMetadataLoaded {
359 image: id,
360 size,
361 ppi,
362 is_mask,
363 } => {
364 if let Some(img) = VIEW_PROCESS.on_image_metadata_loaded(id, size, ppi, is_mask) {
365 let args = RawImageArgs::now(img);
366 self.notify_event(RAW_IMAGE_METADATA_LOADED_EVENT.new_update(args), observer);
367 }
368 }
369 Event::ImagePartiallyLoaded {
370 image: id,
371 partial_size,
372 ppi,
373 is_opaque,
374 is_mask,
375 partial_pixels: partial_bgra8,
376 } => {
377 if let Some(img) = VIEW_PROCESS.on_image_partially_loaded(id, partial_size, ppi, is_opaque, is_mask, partial_bgra8) {
378 let args = RawImageArgs::now(img);
379 self.notify_event(RAW_IMAGE_PARTIALLY_LOADED_EVENT.new_update(args), observer);
380 }
381 }
382 Event::ImageLoaded(image) => {
383 if let Some(img) = VIEW_PROCESS.on_image_loaded(image) {
384 let args = RawImageArgs::now(img);
385 self.notify_event(RAW_IMAGE_LOADED_EVENT.new_update(args), observer);
386 }
387 }
388 Event::ImageLoadError { image: id, error } => {
389 if let Some(img) = VIEW_PROCESS.on_image_error(id, error) {
390 let args = RawImageArgs::now(img);
391 self.notify_event(RAW_IMAGE_LOAD_ERROR_EVENT.new_update(args), observer);
392 }
393 }
394 Event::ImageEncoded { image: id, format, data } => VIEW_PROCESS.on_image_encoded(id, format, data),
395 Event::ImageEncodeError { image: id, format, error } => {
396 VIEW_PROCESS.on_image_encode_error(id, format, error);
397 }
398 Event::FrameImageReady {
399 window: w_id,
400 frame: frame_id,
401 image: image_id,
402 selection,
403 } => {
404 if let Some(img) = VIEW_PROCESS.on_frame_image_ready(image_id) {
405 let args = RawFrameImageReadyArgs::now(img, window_id(w_id), frame_id, selection);
406 self.notify_event(RAW_FRAME_IMAGE_READY_EVENT.new_update(args), observer);
407 }
408 }
409
410 Event::AccessInit { window: w_id } => {
411 self.notify_event(crate::access::on_access_init(window_id(w_id)), observer);
412 }
413 Event::AccessCommand {
414 window: win_id,
415 target: wgt_id,
416 command,
417 } => {
418 if let Some(update) = crate::access::on_access_command(window_id(win_id), WidgetId::from_raw(wgt_id.0), command) {
419 self.notify_event(update, observer);
420 }
421 }
422 Event::AccessDeinit { window: w_id } => {
423 self.notify_event(crate::access::on_access_deinit(window_id(w_id)), observer);
424 }
425
426 Event::MsgDialogResponse(id, response) => {
428 VIEW_PROCESS.on_message_dlg_response(id, response);
429 }
430 Event::FileDialogResponse(id, response) => {
431 VIEW_PROCESS.on_file_dlg_response(id, response);
432 }
433
434 Event::ExtensionEvent(id, payload) => {
436 let args = RawExtensionEventArgs::now(id, payload);
437 self.notify_event(RAW_EXTENSION_EVENT.new_update(args), observer);
438 }
439
440 Event::FontsChanged => {
442 let args = RawFontChangedArgs::now();
443 self.notify_event(RAW_FONT_CHANGED_EVENT.new_update(args), observer);
444 }
445 Event::FontAaChanged(aa) => {
446 let args = RawFontAaChangedArgs::now(aa);
447 self.notify_event(RAW_FONT_AA_CHANGED_EVENT.new_update(args), observer);
448 }
449 Event::MultiClickConfigChanged(cfg) => {
450 let args = RawMultiClickConfigChangedArgs::now(cfg);
451 self.notify_event(RAW_MULTI_CLICK_CONFIG_CHANGED_EVENT.new_update(args), observer);
452 }
453 Event::AnimationsConfigChanged(cfg) => {
454 VARS_APP.set_sys_animations_enabled(cfg.enabled);
455 let args = RawAnimationsConfigChangedArgs::now(cfg);
456 self.notify_event(RAW_ANIMATIONS_CONFIG_CHANGED_EVENT.new_update(args), observer);
457 }
458 Event::KeyRepeatConfigChanged(cfg) => {
459 let args = RawKeyRepeatConfigChangedArgs::now(cfg);
460 self.notify_event(RAW_KEY_REPEAT_CONFIG_CHANGED_EVENT.new_update(args), observer);
461 }
462 Event::TouchConfigChanged(cfg) => {
463 let args = RawTouchConfigChangedArgs::now(cfg);
464 self.notify_event(RAW_TOUCH_CONFIG_CHANGED_EVENT.new_update(args), observer);
465 }
466 Event::LocaleChanged(cfg) => {
467 let args = RawLocaleChangedArgs::now(cfg);
468 self.notify_event(RAW_LOCALE_CONFIG_CHANGED_EVENT.new_update(args), observer);
469 }
470 Event::ColorsConfigChanged(cfg) => {
471 let args = RawColorsConfigChangedArgs::now(cfg);
472 self.notify_event(RAW_COLORS_CONFIG_CHANGED_EVENT.new_update(args), observer);
473 }
474 Event::ChromeConfigChanged(cfg) => {
475 let args = RawChromeConfigChangedArgs::now(cfg);
476 self.notify_event(RAW_CHROME_CONFIG_CHANGED_EVENT.new_update(args), observer);
477 }
478
479 Event::InputDevicesChanged(devices) => {
481 let devices: HashMap<_, _> = devices.into_iter().map(|(d_id, info)| (self.input_device_id(d_id), info)).collect();
482 INPUT_DEVICES.update(devices.clone());
483 let args = InputDevicesChangedArgs::now(devices);
484 self.notify_event(INPUT_DEVICES_CHANGED_EVENT.new_update(args), observer);
485 }
486 Event::InputDeviceEvent { device, event } => {
487 let d_id = self.input_device_id(device);
488 match event {
489 InputDeviceEvent::PointerMotion { delta } => {
490 let args = PointerMotionArgs::now(d_id, delta);
491 self.notify_event(POINTER_MOTION_EVENT.new_update(args), observer);
492 }
493 InputDeviceEvent::ScrollMotion { delta } => {
494 let args = ScrollMotionArgs::now(d_id, delta);
495 self.notify_event(SCROLL_MOTION_EVENT.new_update(args), observer);
496 }
497 InputDeviceEvent::AxisMotion { axis, value } => {
498 let args = AxisMotionArgs::now(d_id, axis, value);
499 self.notify_event(AXIS_MOTION_EVENT.new_update(args), observer);
500 }
501 InputDeviceEvent::Button { button, state } => {
502 let args = ButtonArgs::now(d_id, button, state);
503 self.notify_event(BUTTON_EVENT.new_update(args), observer);
504 }
505 InputDeviceEvent::Key { key_code, state } => {
506 let args = KeyArgs::now(d_id, key_code, state);
507 self.notify_event(KEY_EVENT.new_update(args), observer);
508 }
509 _ => {}
510 }
511 }
512
513 Event::LowMemory => {
514 LOW_MEMORY_EVENT.notify(LowMemoryArgs::now());
515 }
516
517 Event::RecoveredFromComponentPanic { component, recover, panic } => {
518 tracing::error!(
519 "view-process recovered from internal component panic\n component: {component}\n recover: {recover}\n```panic\n{panic}\n```"
520 );
521 }
522
523 Event::Inited(zng_view_api::Inited { .. }) | Event::Suspended | Event::Disconnected(_) | Event::FrameRendered(_) => {
525 unreachable!()
526 } _ => {}
529 }
530 }
531
532 fn on_view_rendered_event<O: AppEventObserver>(&mut self, ev: zng_view_api::window::EventFrameRendered, observer: &mut O) {
534 debug_assert!(ev.window != zng_view_api::window::WindowId::INVALID);
535 let window_id = WindowId::from_raw(ev.window.get());
536 let image = ev.frame_image.map(|img| VIEW_PROCESS.on_frame_image(img));
538 let args = crate::view_process::raw_events::RawFrameRenderedArgs::now(window_id, ev.frame, image);
539 self.notify_event(crate::view_process::raw_events::RAW_FRAME_RENDERED_EVENT.new_update(args), observer);
540 }
541
542 pub(crate) fn run_headed(mut self) {
543 let mut observer = ();
544 #[cfg(feature = "dyn_app_extension")]
545 let mut observer = observer.as_dyn();
546
547 self.apply_updates(&mut observer);
548 self.apply_update_events(&mut observer);
549 let mut wait = false;
550 loop {
551 wait = match self.poll_impl(wait, &mut observer) {
552 AppControlFlow::Poll => false,
553 AppControlFlow::Wait => true,
554 AppControlFlow::Exit => break,
555 };
556 }
557 }
558
559 fn push_coalesce<O: AppEventObserver>(&mut self, ev: AppEvent, observer: &mut O) {
560 match ev {
561 AppEvent::ViewEvent(ev) => match ev {
562 zng_view_api::Event::FrameRendered(ev) => {
563 if ev.window == zng_view_api::window::WindowId::INVALID {
564 tracing::error!("ignored rendered event for invalid window id, {ev:?}");
565 return;
566 }
567
568 let window = WindowId::from_raw(ev.window.get());
569
570 {
572 if VIEW_PROCESS.is_available() {
573 VIEW_PROCESS.on_frame_rendered(window);
574 }
575 }
576
577 #[cfg(debug_assertions)]
578 if self.pending_view_frame_events.iter().any(|e| e.window == ev.window) {
579 tracing::warn!("window `{window:?}` probably sent a frame request without awaiting renderer idle");
580 }
581
582 self.pending_view_frame_events.push(ev);
583 }
584 zng_view_api::Event::Inited(zng_view_api::Inited {
585 generation,
586 is_respawn,
587 extensions,
588 ..
589 }) => {
590 if is_respawn {
592 VIEW_PROCESS.on_respawned(generation);
593 APP_PROCESS_SV.read().is_suspended.set(false);
594 }
595
596 VIEW_PROCESS.handle_inited(generation, extensions.clone());
597
598 let args = crate::view_process::ViewProcessInitedArgs::now(generation, is_respawn, extensions);
599 self.notify_event(VIEW_PROCESS_INITED_EVENT.new_update(args), observer);
600 }
601 zng_view_api::Event::Suspended => {
602 VIEW_PROCESS.handle_suspended();
603 let args = crate::view_process::ViewProcessSuspendedArgs::now();
604 self.notify_event(VIEW_PROCESS_SUSPENDED_EVENT.new_update(args), observer);
605 APP_PROCESS_SV.read().is_suspended.set(true);
606 }
607 zng_view_api::Event::Disconnected(vp_gen) => {
608 VIEW_PROCESS.handle_disconnect(vp_gen);
610 }
611 ev => {
612 if let Some(last) = self.pending_view_events.last_mut() {
613 match last.coalesce(ev) {
614 Ok(()) => {}
615 Err(ev) => self.pending_view_events.push(ev),
616 }
617 } else {
618 self.pending_view_events.push(ev);
619 }
620 }
621 },
622 AppEvent::Event(ev) => EVENTS.notify(ev.get()),
623 AppEvent::Update(op, target) => {
624 UPDATES.update_op(op, target);
625 }
626 AppEvent::CheckUpdate => {}
627 AppEvent::ResumeUnwind(p) => std::panic::resume_unwind(p),
628 }
629 }
630
631 fn has_pending_updates(&mut self) -> bool {
632 !self.pending_view_events.is_empty() || self.pending.has_updates() || UPDATES.has_pending_updates() || !self.receiver.is_empty()
633 }
634
635 pub(crate) fn poll<O: AppEventObserver>(&mut self, wait_app_event: bool, observer: &mut O) -> AppControlFlow {
636 #[cfg(feature = "dyn_app_extension")]
637 let mut observer = observer.as_dyn();
638 #[cfg(feature = "dyn_app_extension")]
639 let observer = &mut observer;
640 self.poll_impl(wait_app_event, observer)
641 }
642 fn poll_impl<O: AppEventObserver>(&mut self, wait_app_event: bool, observer: &mut O) -> AppControlFlow {
643 let mut disconnected = false;
644
645 if self.exited {
646 return AppControlFlow::Exit;
647 }
648
649 if wait_app_event {
650 let idle = tracing::debug_span!("<idle>", ended_by = tracing::field::Empty).entered();
651
652 let timer = if self.view_is_busy() { None } else { self.loop_timer.poll() };
653 if let Some(time) = timer {
654 match self.receiver.recv_deadline_sp(time) {
655 Ok(ev) => {
656 idle.record("ended_by", "event");
657 drop(idle);
658 self.push_coalesce(ev, observer)
659 }
660 Err(e) => match e {
661 flume::RecvTimeoutError::Timeout => {
662 idle.record("ended_by", "timeout");
663 }
664 flume::RecvTimeoutError::Disconnected => {
665 idle.record("ended_by", "disconnected");
666 disconnected = true
667 }
668 },
669 }
670 } else {
671 match self.receiver.recv() {
672 Ok(ev) => {
673 idle.record("ended_by", "event");
674 drop(idle);
675 self.push_coalesce(ev, observer)
676 }
677 Err(e) => match e {
678 flume::RecvError::Disconnected => {
679 idle.record("ended_by", "disconnected");
680 disconnected = true
681 }
682 },
683 }
684 }
685 }
686 loop {
687 match self.receiver.try_recv() {
688 Ok(ev) => self.push_coalesce(ev, observer),
689 Err(e) => match e {
690 flume::TryRecvError::Empty => break,
691 flume::TryRecvError::Disconnected => {
692 disconnected = true;
693 break;
694 }
695 },
696 }
697 }
698 if disconnected {
699 panic!("app events channel disconnected");
700 }
701
702 if self.view_is_busy() {
703 return AppControlFlow::Wait;
704 }
705
706 UPDATES.on_app_awake();
707
708 let updated_timers = self.loop_timer.awake();
710 if updated_timers {
711 UPDATES.update_timers(&mut self.loop_timer);
713 self.apply_updates(observer);
714 }
715
716 let mut events = mem::take(&mut self.pending_view_events);
717 for ev in events.drain(..) {
718 self.on_view_event(ev, observer);
719 self.apply_updates(observer);
720 }
721 debug_assert!(self.pending_view_events.is_empty());
722 self.pending_view_events = events; let mut events = mem::take(&mut self.pending_view_frame_events);
725 for ev in events.drain(..) {
726 self.on_view_rendered_event(ev, observer);
727 }
728 self.pending_view_frame_events = events;
729
730 if self.has_pending_updates() {
731 self.apply_updates(observer);
732 self.apply_update_events(observer);
733 }
734
735 if self.view_is_busy() {
736 return AppControlFlow::Wait;
737 }
738
739 self.finish_frame(observer);
740
741 UPDATES.next_deadline(&mut self.loop_timer);
742
743 if self.extensions.0.exit() {
744 UPDATES.on_app_sleep();
745 self.exited = true;
746 AppControlFlow::Exit
747 } else if self.has_pending_updates() || UPDATES.has_pending_layout_or_render() {
748 AppControlFlow::Poll
749 } else {
750 UPDATES.on_app_sleep();
751 AppControlFlow::Wait
752 }
753 }
754
755 fn apply_updates<O: AppEventObserver>(&mut self, observer: &mut O) {
757 let _s = tracing::debug_span!("apply_updates").entered();
758
759 let mut run = true;
760 while run {
761 run = self.loop_monitor.update(|| {
762 let mut any = false;
763
764 self.pending |= UPDATES.apply_info();
765 if mem::take(&mut self.pending.info) {
766 any = true;
767 let _s = tracing::debug_span!("info").entered();
768
769 let mut info_widgets = mem::take(&mut self.pending.info_widgets);
770
771 let _t = INSTANT_APP.pause_for_update();
772
773 {
774 let _s = tracing::debug_span!("ext.info").entered();
775 self.extensions.info(&mut info_widgets);
776 }
777 {
778 let _s = tracing::debug_span!("obs.info").entered();
779 observer.info(&mut info_widgets);
780 }
781 }
782
783 self.pending |= UPDATES.apply_updates();
784 TimersService::notify();
785 if mem::take(&mut self.pending.update) {
786 any = true;
787 let _s = tracing::debug_span!("update").entered();
788
789 let mut update_widgets = mem::take(&mut self.pending.update_widgets);
790
791 let _t = INSTANT_APP.pause_for_update();
792
793 {
794 let _s = tracing::debug_span!("ext.update_preview").entered();
795 self.extensions.update_preview();
796 }
797 {
798 let _s = tracing::debug_span!("obs.update_preview").entered();
799 observer.update_preview();
800 }
801 UPDATES.on_pre_updates();
802
803 {
804 let _s = tracing::debug_span!("ext.update_ui").entered();
805 self.extensions.update_ui(&mut update_widgets);
806 }
807 {
808 let _s = tracing::debug_span!("obs.update_ui").entered();
809 observer.update_ui(&mut update_widgets);
810 }
811
812 {
813 let _s = tracing::debug_span!("ext.update").entered();
814 self.extensions.update();
815 }
816 {
817 let _s = tracing::debug_span!("obs.update").entered();
818 observer.update();
819 }
820 UPDATES.on_updates();
821 }
822
823 any
824 });
825 }
826 }
827
828 fn apply_update_events<O: AppEventObserver>(&mut self, observer: &mut O) {
830 let _s = tracing::debug_span!("apply_update_events").entered();
831
832 loop {
833 let events: Vec<_> = self.pending.events.drain(..).collect();
834 if events.is_empty() {
835 break;
836 }
837 for mut update in events {
838 let _s = tracing::debug_span!("update_event", ?update).entered();
839
840 self.loop_monitor.maybe_trace(|| {
841 let _t = INSTANT_APP.pause_for_update();
842
843 {
844 let _s = tracing::debug_span!("ext.event_preview").entered();
845 self.extensions.event_preview(&mut update);
846 }
847 {
848 let _s = tracing::debug_span!("obs.event_preview").entered();
849 observer.event_preview(&mut update);
850 }
851 update.call_pre_actions();
852
853 {
854 let _s = tracing::debug_span!("ext.event_ui").entered();
855 self.extensions.event_ui(&mut update);
856 }
857 {
858 let _s = tracing::debug_span!("obs.event_ui").entered();
859 observer.event_ui(&mut update);
860 }
861 {
862 let _s = tracing::debug_span!("ext.event").entered();
863 self.extensions.event(&mut update);
864 }
865 {
866 let _s = tracing::debug_span!("obs.event").entered();
867 observer.event(&mut update);
868 }
869 update.call_pos_actions();
870 });
871
872 self.apply_updates(observer);
873 }
874 }
875 }
876
877 fn view_is_busy(&mut self) -> bool {
878 VIEW_PROCESS.is_available() && VIEW_PROCESS.pending_frames() > 0
879 }
880
881 fn finish_frame<O: AppEventObserver>(&mut self, observer: &mut O) {
883 debug_assert!(!self.view_is_busy());
884
885 self.pending |= UPDATES.apply_layout_render();
886
887 while mem::take(&mut self.pending.layout) {
888 let _s = tracing::debug_span!("apply_layout").entered();
889
890 let mut layout_widgets = mem::take(&mut self.pending.layout_widgets);
891
892 self.loop_monitor.maybe_trace(|| {
893 let _t = INSTANT_APP.pause_for_update();
894
895 {
896 let _s = tracing::debug_span!("ext.layout").entered();
897 self.extensions.layout(&mut layout_widgets);
898 }
899 {
900 let _s = tracing::debug_span!("obs.layout").entered();
901 observer.layout(&mut layout_widgets);
902 }
903 });
904
905 self.apply_updates(observer);
906 self.pending |= UPDATES.apply_layout_render();
907 }
908
909 if mem::take(&mut self.pending.render) {
910 let _s = tracing::debug_span!("apply_render").entered();
911
912 let mut render_widgets = mem::take(&mut self.pending.render_widgets);
913 let mut render_update_widgets = mem::take(&mut self.pending.render_update_widgets);
914
915 let _t = INSTANT_APP.pause_for_update();
916
917 {
918 let _s = tracing::debug_span!("ext.render").entered();
919 self.extensions.render(&mut render_widgets, &mut render_update_widgets);
920 }
921 {
922 let _s = tracing::debug_span!("obs.render").entered();
923 observer.render(&mut render_widgets, &mut render_update_widgets);
924 }
925 }
926
927 self.loop_monitor.finish_frame();
928 }
929}
930impl<E: AppExtension> Drop for RunningApp<E> {
931 fn drop(&mut self) {
932 let _s = tracing::debug_span!("ext.deinit").entered();
933 self.extensions.deinit();
934 VIEW_PROCESS.exit();
935 }
936}
937
938pub struct AppStartArgs {
943 _private: (),
944}
945
946pub fn on_app_start(handler: impl FnMut(&AppStartArgs) + Send + 'static) {
952 zng_unique_id::hot_static_ref!(ON_APP_START).lock().push(Box::new(handler))
953}
954zng_unique_id::hot_static! {
955 static ON_APP_START: Mutex<Vec<AppStartHandler>> = Mutex::new(vec![]);
956}
957type AppStartHandler = Box<dyn FnMut(&AppStartArgs) + Send + 'static>;
958
959#[derive(Debug)]
961pub(crate) struct LoopTimer {
962 now: DInstant,
963 deadline: Option<Deadline>,
964}
965impl Default for LoopTimer {
966 fn default() -> Self {
967 Self {
968 now: INSTANT.now(),
969 deadline: None,
970 }
971 }
972}
973impl LoopTimer {
974 pub fn elapsed(&mut self, deadline: Deadline) -> bool {
977 if deadline.0 <= self.now {
978 true
979 } else {
980 self.register(deadline);
981 false
982 }
983 }
984
985 pub fn register(&mut self, deadline: Deadline) {
987 if let Some(d) = &mut self.deadline {
988 if deadline < *d {
989 *d = deadline;
990 }
991 } else {
992 self.deadline = Some(deadline)
993 }
994 }
995
996 pub(crate) fn poll(&mut self) -> Option<Deadline> {
998 self.deadline
999 }
1000
1001 pub(crate) fn awake(&mut self) -> bool {
1003 self.now = INSTANT.now();
1004 if let Some(d) = self.deadline
1005 && d.0 <= self.now
1006 {
1007 self.deadline = None;
1008 return true;
1009 }
1010 false
1011 }
1012
1013 pub fn now(&self) -> DInstant {
1015 self.now
1016 }
1017}
1018impl zng_var::animation::AnimationTimer for LoopTimer {
1019 fn elapsed(&mut self, deadline: Deadline) -> bool {
1020 self.elapsed(deadline)
1021 }
1022
1023 fn register(&mut self, deadline: Deadline) {
1024 self.register(deadline)
1025 }
1026
1027 fn now(&self) -> DInstant {
1028 self.now()
1029 }
1030}
1031
1032#[derive(Default)]
1033struct LoopMonitor {
1034 update_count: u16,
1035 skipped: bool,
1036 trace: Vec<UpdateTrace>,
1037}
1038impl LoopMonitor {
1039 pub fn update(&mut self, update_once: impl FnOnce() -> bool) -> bool {
1041 self.update_count += 1;
1042
1043 if self.update_count < 500 {
1044 update_once()
1045 } else if self.update_count < 1000 {
1046 UpdatesTrace::collect_trace(&mut self.trace, update_once)
1047 } else if self.update_count == 1000 {
1048 self.skipped = true;
1049 let trace = UpdatesTrace::format_trace(mem::take(&mut self.trace));
1050 tracing::error!(
1051 "updated 1000 times without rendering, probably stuck in an infinite loop\n\
1052 will start skipping updates to render and poll system events\n\
1053 top 20 most frequent update requests (in 500 cycles):\n\
1054 {trace}\n\
1055 you can use `UpdatesTraceUiNodeExt` and `updates_trace_event` to refine the trace"
1056 );
1057 false
1058 } else if self.update_count == 1500 {
1059 self.update_count = 1001;
1060 false
1061 } else {
1062 update_once()
1063 }
1064 }
1065
1066 pub fn maybe_trace(&mut self, notify_once: impl FnOnce()) {
1067 if (500..1000).contains(&self.update_count) {
1068 UpdatesTrace::collect_trace(&mut self.trace, notify_once);
1069 } else {
1070 notify_once();
1071 }
1072 }
1073
1074 pub fn finish_frame(&mut self) {
1075 if !self.skipped {
1076 self.skipped = false;
1077 self.update_count = 0;
1078 self.trace = vec![];
1079 }
1080 }
1081}
1082
1083impl APP {
1084 pub fn exit(&self) -> ResponseVar<ExitCancelled> {
1093 APP_PROCESS_SV.write().exit()
1094 }
1095
1096 pub fn is_suspended(&self) -> Var<bool> {
1104 APP_PROCESS_SV.read().is_suspended.read_only()
1105 }
1106}
1107
1108impl APP {
1112 pub fn pause_time_for_update(&self) -> Var<bool> {
1118 APP_PROCESS_SV.read().pause_time_for_updates.clone()
1119 }
1120
1121 pub fn start_manual_time(&self) {
1129 INSTANT_APP.set_mode(InstantMode::Manual);
1130 INSTANT_APP.set_now(INSTANT.now());
1131 UPDATES.update(None);
1132 }
1133
1134 pub fn advance_manual_time(&self, advance: Duration) {
1145 INSTANT_APP.advance_now(advance);
1146 UPDATES.update(None);
1147 }
1148
1149 pub fn set_manual_time(&self, now: DInstant) {
1158 INSTANT_APP.set_now(now);
1159 UPDATES.update(None);
1160 }
1161
1162 pub fn end_manual_time(&self) {
1164 INSTANT_APP.set_mode(match APP.pause_time_for_update().get() {
1165 true => InstantMode::UpdatePaused,
1166 false => InstantMode::Now,
1167 });
1168 UPDATES.update(None);
1169 }
1170}
1171
1172command! {
1173 pub static EXIT_CMD = {
1177 l10n!: true,
1178 name: "Exit",
1179 info: "Close all windows and exit",
1180 shortcut: shortcut!(Exit),
1181 };
1182}
1183
1184#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1188pub struct ExitCancelled;
1189impl fmt::Display for ExitCancelled {
1190 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1191 write!(f, "exit request cancelled")
1192 }
1193}
1194
1195struct AppIntrinsic {
1196 exit_handle: CommandHandle,
1197 pending_exit: Option<PendingExit>,
1198}
1199struct PendingExit {
1200 handle: EventPropagationHandle,
1201 response: ResponderVar<ExitCancelled>,
1202}
1203impl AppIntrinsic {
1204 pub(super) fn pre_init(is_headed: bool, with_renderer: bool, view_process_exe: PathBuf, view_process_env: HashMap<Txt, Txt>) -> Self {
1206 APP_PROCESS_SV
1207 .read()
1208 .pause_time_for_updates
1209 .hook(|a| {
1210 if !matches!(INSTANT.mode(), zng_time::InstantMode::Manual) {
1211 if *a.value() {
1212 INSTANT_APP.set_mode(InstantMode::UpdatePaused);
1213 } else {
1214 INSTANT_APP.set_mode(InstantMode::Now);
1215 }
1216 }
1217 true
1218 })
1219 .perm();
1220
1221 if is_headed {
1222 debug_assert!(with_renderer);
1223
1224 let view_evs_sender = UPDATES.sender();
1225 VIEW_PROCESS.start(view_process_exe, view_process_env, false, move |ev| {
1226 let _ = view_evs_sender.send_view_event(ev);
1227 });
1228 } else if with_renderer {
1229 let view_evs_sender = UPDATES.sender();
1230 VIEW_PROCESS.start(view_process_exe, view_process_env, true, move |ev| {
1231 let _ = view_evs_sender.send_view_event(ev);
1232 });
1233 }
1234
1235 AppIntrinsic {
1236 exit_handle: EXIT_CMD.subscribe(true),
1237 pending_exit: None,
1238 }
1239 }
1240
1241 pub(super) fn exit(&mut self) -> bool {
1243 if let Some(pending) = self.pending_exit.take() {
1244 if pending.handle.is_stopped() {
1245 pending.response.respond(ExitCancelled);
1246 false
1247 } else {
1248 true
1249 }
1250 } else {
1251 false
1252 }
1253 }
1254}
1255impl AppExtension for AppIntrinsic {
1256 fn event_preview(&mut self, update: &mut EventUpdate) {
1257 if VIEW_PROCESS_INITED_EVENT.has(update) {
1258 let filter = APP_PROCESS_SV.read().device_events_filter.get();
1259 if !filter.is_empty()
1260 && let Err(e) = VIEW_PROCESS.set_device_events_filter(filter)
1261 {
1262 tracing::error!("cannot set device events on the view-process, {e}");
1263 }
1264 } else if let Some(args) = EXIT_CMD.on(update) {
1265 args.handle_enabled(&self.exit_handle, |_| {
1266 APP.exit();
1267 });
1268 }
1269 }
1270
1271 fn update(&mut self) {
1272 let mut sv = APP_PROCESS_SV.write();
1273 if let Some(filter) = sv.device_events_filter.get_new()
1274 && let Err(e) = VIEW_PROCESS.set_device_events_filter(filter)
1275 {
1276 tracing::error!("cannot set device events on the view-process, {e}");
1277 }
1278 if let Some(response) = sv.take_requests() {
1279 let args = ExitRequestedArgs::now();
1280 self.pending_exit = Some(PendingExit {
1281 handle: args.propagation().clone(),
1282 response,
1283 });
1284 EXIT_REQUESTED_EVENT.notify(args);
1285 }
1286 }
1287}
1288
1289pub(crate) fn assert_not_view_process() {
1290 if zng_view_api::ViewConfig::from_env().is_some() {
1291 panic!("cannot start App in view-process");
1292 }
1293}
1294#[cfg(feature = "deadlock_detection")]
1299pub fn spawn_deadlock_detection() {
1300 use parking_lot::deadlock;
1301 use std::{
1302 sync::atomic::{self, AtomicBool},
1303 thread,
1304 time::*,
1305 };
1306
1307 static CHECK_RUNNING: AtomicBool = AtomicBool::new(false);
1308
1309 if CHECK_RUNNING.swap(true, atomic::Ordering::SeqCst) {
1310 return;
1311 }
1312
1313 thread::spawn(|| {
1314 loop {
1315 thread::sleep(Duration::from_secs(10));
1316
1317 let deadlocks = deadlock::check_deadlock();
1318 if deadlocks.is_empty() {
1319 continue;
1320 }
1321
1322 use std::fmt::Write;
1323 let mut msg = String::new();
1324
1325 let _ = writeln!(&mut msg, "{} deadlocks detected", deadlocks.len());
1326 for (i, threads) in deadlocks.iter().enumerate() {
1327 let _ = writeln!(&mut msg, "Deadlock #{}, {} threads", i, threads.len());
1328 for t in threads {
1329 let _ = writeln!(&mut msg, "Thread Id {:#?}", t.thread_id());
1330 let _ = writeln!(&mut msg, "{:#?}", t.backtrace());
1331 }
1332 }
1333
1334 #[cfg(not(feature = "test_util"))]
1335 eprint!("{msg}");
1336
1337 #[cfg(feature = "test_util")]
1338 {
1339 use std::io::Write;
1342 let _ = write!(&mut std::io::stderr(), "{msg}");
1343 zng_env::exit(-1);
1344 }
1345 }
1346 });
1347}
1348#[cfg(not(feature = "deadlock_detection"))]
1353pub fn spawn_deadlock_detection() {}
1354
1355app_local! {
1356 pub(super) static APP_PROCESS_SV: AppProcessService = AppProcessService {
1357 exit_requests: None,
1358 extensions: None,
1359 device_events_filter: zng_var::var(Default::default()),
1360 pause_time_for_updates: zng_var::var(true),
1361 is_suspended: zng_var::var(false),
1362 };
1363}
1364
1365pub(super) struct AppProcessService {
1366 exit_requests: Option<ResponderVar<ExitCancelled>>,
1367 extensions: Option<Arc<AppExtensionsInfo>>,
1368 pub(crate) device_events_filter: Var<DeviceEventsFilter>,
1369 pause_time_for_updates: Var<bool>,
1370 is_suspended: Var<bool>,
1371}
1372impl AppProcessService {
1373 pub(super) fn take_requests(&mut self) -> Option<ResponderVar<ExitCancelled>> {
1374 self.exit_requests.take()
1375 }
1376
1377 fn exit(&mut self) -> ResponseVar<ExitCancelled> {
1378 if let Some(r) = &self.exit_requests {
1379 r.response_var()
1380 } else {
1381 let (responder, response) = response_var();
1382 self.exit_requests = Some(responder);
1383 UPDATES.update(None);
1384 response
1385 }
1386 }
1387
1388 pub(super) fn extensions(&self) -> Arc<AppExtensionsInfo> {
1389 self.extensions
1390 .clone()
1391 .unwrap_or_else(|| Arc::new(AppExtensionsInfo { infos: vec![] }))
1392 }
1393
1394 pub(super) fn set_extensions(&mut self, info: AppExtensionsInfo) {
1395 self.extensions = Some(Arc::new(info));
1396 }
1397}
1398
1399#[derive(Debug)]
1401#[allow(clippy::large_enum_variant)] pub(crate) enum AppEvent {
1403 ViewEvent(zng_view_api::Event),
1405 Event(crate::event::EventUpdateMsg),
1407 Update(UpdateOp, Option<WidgetId>),
1409 ResumeUnwind(PanicPayload),
1411 CheckUpdate,
1413}
1414
1415#[derive(Clone)]
1421pub struct AppEventSender(flume::Sender<AppEvent>);
1422impl AppEventSender {
1423 pub(crate) fn new() -> (Self, flume::Receiver<AppEvent>) {
1424 let (sender, receiver) = flume::unbounded();
1425 (Self(sender), receiver)
1426 }
1427
1428 #[allow(clippy::result_large_err)] fn send_app_event(&self, event: AppEvent) -> Result<(), AppChannelError> {
1430 self.0.send(event).map_err(|_| AppChannelError::Disconnected)
1431 }
1432
1433 #[allow(clippy::result_large_err)]
1434 fn send_view_event(&self, event: zng_view_api::Event) -> Result<(), AppChannelError> {
1435 self.0.send(AppEvent::ViewEvent(event)).map_err(|_| AppChannelError::Disconnected)
1436 }
1437
1438 pub fn send_update(&self, op: UpdateOp, target: impl Into<Option<WidgetId>>) -> Result<(), AppChannelError> {
1440 UpdatesTrace::log_update();
1441 self.send_app_event(AppEvent::Update(op, target.into()))
1442 .map_err(|_| AppChannelError::Disconnected)
1443 }
1444
1445 pub(crate) fn send_event(&self, event: crate::event::EventUpdateMsg) -> Result<(), AppChannelError> {
1447 self.send_app_event(AppEvent::Event(event))
1448 .map_err(|_| AppChannelError::Disconnected)
1449 }
1450
1451 pub fn send_resume_unwind(&self, payload: PanicPayload) -> Result<(), AppChannelError> {
1453 self.send_app_event(AppEvent::ResumeUnwind(payload))
1454 .map_err(|_| AppChannelError::Disconnected)
1455 }
1456
1457 pub(crate) fn send_check_update(&self) -> Result<(), AppChannelError> {
1459 self.send_app_event(AppEvent::CheckUpdate)
1460 .map_err(|_| AppChannelError::Disconnected)
1461 }
1462
1463 pub fn waker(&self, target: impl Into<Option<WidgetId>>) -> Waker {
1465 Arc::new(AppWaker(self.0.clone(), target.into())).into()
1466 }
1467
1468 pub fn ext_channel<T>(&self) -> (AppExtSender<T>, AppExtReceiver<T>) {
1470 let (sender, receiver) = flume::unbounded();
1471
1472 (
1473 AppExtSender {
1474 update: self.clone(),
1475 sender,
1476 },
1477 AppExtReceiver { receiver },
1478 )
1479 }
1480
1481 pub fn ext_channel_bounded<T>(&self, cap: usize) -> (AppExtSender<T>, AppExtReceiver<T>) {
1483 let (sender, receiver) = flume::bounded(cap);
1484
1485 (
1486 AppExtSender {
1487 update: self.clone(),
1488 sender,
1489 },
1490 AppExtReceiver { receiver },
1491 )
1492 }
1493}
1494
1495struct AppWaker(flume::Sender<AppEvent>, Option<WidgetId>);
1496impl std::task::Wake for AppWaker {
1497 fn wake(self: std::sync::Arc<Self>) {
1498 self.wake_by_ref()
1499 }
1500 fn wake_by_ref(self: &Arc<Self>) {
1501 let _ = self.0.send(AppEvent::Update(UpdateOp::Update, self.1));
1502 }
1503}
1504
1505type PanicPayload = Box<dyn std::any::Any + Send + 'static>;
1506
1507pub struct AppExtSender<T> {
1511 update: AppEventSender,
1512 sender: flume::Sender<T>,
1513}
1514impl<T> Clone for AppExtSender<T> {
1515 fn clone(&self) -> Self {
1516 Self {
1517 update: self.update.clone(),
1518 sender: self.sender.clone(),
1519 }
1520 }
1521}
1522impl<T: Send> AppExtSender<T> {
1523 pub fn send(&self, msg: T) -> Result<(), AppChannelError> {
1525 match self.update.send_update(UpdateOp::Update, None) {
1526 Ok(()) => self.sender.send(msg).map_err(|_| AppChannelError::Disconnected),
1527 Err(_) => Err(AppChannelError::Disconnected),
1528 }
1529 }
1530
1531 pub fn send_timeout(&self, msg: T, dur: Duration) -> Result<(), AppChannelError> {
1533 match self.update.send_update(UpdateOp::Update, None) {
1534 Ok(()) => self.sender.send_timeout(msg, dur).map_err(|e| match e {
1535 flume::SendTimeoutError::Timeout(_) => AppChannelError::Timeout,
1536 flume::SendTimeoutError::Disconnected(_) => AppChannelError::Disconnected,
1537 }),
1538 Err(_) => Err(AppChannelError::Disconnected),
1539 }
1540 }
1541
1542 pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), AppChannelError> {
1544 match self.update.send_update(UpdateOp::Update, None) {
1545 Ok(()) => self.sender.send_deadline(msg, deadline).map_err(|e| match e {
1546 flume::SendTimeoutError::Timeout(_) => AppChannelError::Timeout,
1547 flume::SendTimeoutError::Disconnected(_) => AppChannelError::Disconnected,
1548 }),
1549 Err(_) => Err(AppChannelError::Disconnected),
1550 }
1551 }
1552}
1553
1554pub struct AppExtReceiver<T> {
1558 receiver: flume::Receiver<T>,
1559}
1560impl<T> Clone for AppExtReceiver<T> {
1561 fn clone(&self) -> Self {
1562 Self {
1563 receiver: self.receiver.clone(),
1564 }
1565 }
1566}
1567impl<T> AppExtReceiver<T> {
1568 pub fn try_recv(&self) -> Result<T, Option<AppChannelError>> {
1573 self.receiver.try_recv().map_err(|e| match e {
1574 flume::TryRecvError::Empty => None,
1575 flume::TryRecvError::Disconnected => Some(AppChannelError::Disconnected),
1576 })
1577 }
1578}
1579
1580#[derive(Debug, Clone)]
1582#[non_exhaustive]
1583pub enum AppChannelError {
1584 Disconnected,
1586 Timeout,
1588}
1589impl fmt::Display for AppChannelError {
1590 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1591 match self {
1592 AppChannelError::Disconnected => write!(f, "cannot receive because the sender disconnected"),
1593 AppChannelError::Timeout => write!(f, "deadline elapsed before message could be send/received"),
1594 }
1595 }
1596}
1597impl std::error::Error for AppChannelError {}
1598impl From<flume::RecvTimeoutError> for AppChannelError {
1599 fn from(value: flume::RecvTimeoutError) -> Self {
1600 match value {
1601 flume::RecvTimeoutError::Timeout => AppChannelError::Timeout,
1602 flume::RecvTimeoutError::Disconnected => AppChannelError::Disconnected,
1603 }
1604 }
1605}
1606
1607event_args! {
1608 pub struct ExitRequestedArgs {
1612
1613 ..
1614
1615 fn delivery_list(&self, list: &mut UpdateDeliveryList) {
1617 list.search_all()
1618 }
1619 }
1620}
1621
1622event! {
1623 pub static EXIT_REQUESTED_EVENT: ExitRequestedArgs;
1631}
1632
1633trait ReceiverExt<T> {
1635 fn recv_deadline_sp(&self, deadline: Deadline) -> Result<T, flume::RecvTimeoutError>;
1637}
1638
1639const WORST_SLEEP_ERR: Duration = Duration::from_millis(if cfg!(windows) { 20 } else { 10 });
1640const WORST_SPIN_ERR: Duration = Duration::from_millis(if cfg!(windows) { 2 } else { 1 });
1641
1642impl<T> ReceiverExt<T> for flume::Receiver<T> {
1643 fn recv_deadline_sp(&self, deadline: Deadline) -> Result<T, flume::RecvTimeoutError> {
1644 loop {
1645 if let Some(d) = deadline.0.checked_duration_since(INSTANT.now()) {
1646 if matches!(INSTANT.mode(), zng_time::InstantMode::Manual) {
1647 match self.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
1650 Err(flume::RecvTimeoutError::Timeout) => continue, interrupt => return interrupt,
1652 }
1653 } else if d > WORST_SLEEP_ERR {
1654 #[cfg(not(target_arch = "wasm32"))]
1656 match self.recv_deadline(deadline.0.checked_sub(WORST_SLEEP_ERR).unwrap().into()) {
1657 Err(flume::RecvTimeoutError::Timeout) => continue, interrupt => return interrupt,
1659 }
1660
1661 #[cfg(target_arch = "wasm32")] match self.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
1663 Err(flume::RecvTimeoutError::Timeout) => continue, interrupt => return interrupt,
1665 }
1666 } else if d > WORST_SPIN_ERR {
1667 let spin_deadline = Deadline(deadline.0.checked_sub(WORST_SPIN_ERR).unwrap());
1668
1669 while !spin_deadline.has_elapsed() {
1671 match self.try_recv() {
1672 Err(flume::TryRecvError::Empty) => std::thread::yield_now(),
1673 Err(flume::TryRecvError::Disconnected) => return Err(flume::RecvTimeoutError::Disconnected),
1674 Ok(msg) => return Ok(msg),
1675 }
1676 }
1677 continue; } else {
1679 while !deadline.has_elapsed() {
1681 std::thread::yield_now();
1682 }
1683 return Err(flume::RecvTimeoutError::Timeout);
1684 }
1685 } else {
1686 return Err(flume::RecvTimeoutError::Timeout);
1687 }
1688 }
1689 }
1690}