1use crate::command::{CommandInner, Effect};
2use crate::keyed_tasks::{CommandMeta, KeyedEffectContext};
3use crate::model::Model;
4use crate::observability::{
5 Observability, ProgramConfig, QueueOverflowAction, RuntimeEvent, TelemetryEvent,
6};
7use crate::queue::{QueueReservation, QueueTracker, QueuedMessage};
8use crate::runtime::Runtime;
9use crate::{Command, CommandKind, Dispatcher, Key, ModelContext};
10use futures::StreamExt;
11use gpui::{App, AppContext, AsyncApp, Context, Entity, IntoElement, Render, Task, Window};
12use std::{fmt, sync::Arc};
13
14#[derive(Clone, Copy, Debug, PartialEq, Eq)]
15pub struct RuntimeSnapshot {
17 pub queue_depth: usize,
19 pub is_draining: bool,
21 pub active_keyed_tasks: usize,
23 pub active_subscriptions: usize,
25}
26
27pub struct Program<M: Model> {
29 model: M,
30 runtime: Runtime<M::Msg>,
31}
32
33impl<M> fmt::Debug for Program<M>
34where
35 M: Model + fmt::Debug,
36{
37 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
38 formatter
39 .debug_struct("Program")
40 .field("model", &self.model)
41 .finish_non_exhaustive()
42 }
43}
44
45impl<M: Model> Program<M> {
46 fn build_runtime(config: ProgramConfig<M::Msg>, cx: &mut Context<'_, Self>) -> Runtime<M::Msg> {
47 let queue_policy = config.queue_policy;
48 let queue_tracker = Arc::new(QueueTracker::new(queue_policy));
49 let observability = Observability::new(
50 config,
51 Arc::new({
52 let queue_tracker = queue_tracker.clone();
53 move || queue_tracker.depth()
54 }),
55 );
56 let (sender, mut receiver) = futures::channel::mpsc::unbounded();
57 let dispatcher = Dispatcher::new(sender, queue_tracker.clone(), observability.clone());
58 let program = cx.weak_entity();
59 let receive_task = cx.spawn(move |_this, async_cx: &mut AsyncApp| {
60 let mut async_cx = async_cx.clone();
61 async move {
62 while let Some(message) = receiver.next().await {
63 program
64 .update(&mut async_cx, |program, cx| {
65 program.receive_enqueued(message, cx);
66 })
67 .ok();
68 }
69 }
70 });
71
72 Runtime::new(dispatcher, queue_tracker, receive_task, observability)
73 }
74
75 pub fn mount(model: M, cx: &mut App) -> Entity<Self> {
89 Self::mount_with(model, ProgramConfig::default(), cx)
90 }
91
92 pub fn mount_with(model: M, config: ProgramConfig<M::Msg>, cx: &mut App) -> Entity<Self> {
107 cx.new(|cx| {
108 let runtime = Self::build_runtime(config, cx);
109 let mut program = Self { model, runtime };
110 let init_command = program.model.init(cx, &ModelContext::root());
111 program.execute_command(init_command, cx);
112 program.reconcile_subscriptions(cx);
113 program
114 })
115 }
116
117 pub fn model(&self) -> &M {
123 &self.model
124 }
125
126 pub fn runtime_snapshot(&self) -> RuntimeSnapshot {
132 RuntimeSnapshot {
133 queue_depth: self.runtime.queue_tracker.depth(),
134 is_draining: self.runtime.queue.is_draining,
135 active_keyed_tasks: self.runtime.tasks.len(),
136 active_subscriptions: self.runtime.subscriptions.len(),
137 }
138 }
139
140 pub fn dispatcher(&self) -> Dispatcher<M::Msg> {
146 self.runtime.dispatcher.clone()
147 }
148
149 fn receive_enqueued(&mut self, queued: QueuedMessage<M::Msg>, cx: &mut Context<'_, Self>) {
150 if self.runtime.queue_tracker.take_if_dropped(queued.id) {
151 return;
152 }
153
154 self.runtime.queue.pending.push_back(queued);
155 self.observe_queue_warning_if_needed();
156 self.drain_queue(cx);
157 }
158
159 fn enqueue_message(&mut self, message: M::Msg, cx: &mut Context<'_, Self>) {
160 let message_description = self.runtime.observability.describe_message_value(&message);
161 let reservation = self.runtime.queue_tracker.reserve_enqueue();
162
163 match reservation {
164 QueueReservation::Rejected => {
165 self.runtime
166 .observability
167 .observe_telemetry(TelemetryEvent::QueueOverflow {
168 policy: self.runtime.observability.queue_policy(),
169 action: QueueOverflowAction::RejectedNew,
170 message_description,
171 });
172 }
173 QueueReservation::DroppedNewest => {
174 self.runtime
175 .observability
176 .observe_telemetry(TelemetryEvent::QueueOverflow {
177 policy: self.runtime.observability.queue_policy(),
178 action: QueueOverflowAction::DroppedNewest,
179 message_description,
180 });
181 }
182 QueueReservation::Accepted {
183 id,
184 overflow_action,
185 ..
186 } => {
187 if let Some(action) = overflow_action {
188 self.runtime
189 .observability
190 .observe_telemetry(TelemetryEvent::QueueOverflow {
191 policy: self.runtime.observability.queue_policy(),
192 action,
193 message_description: message_description.clone(),
194 });
195 }
196
197 self.receive_enqueued(QueuedMessage { id, message }, cx);
198 }
199 }
200 }
201
202 fn observe_queue_warning_if_needed(&self) {
203 if let Some(threshold) = self.runtime.observability.queue_warning_threshold() {
204 let queued = self.runtime.queue_tracker.depth();
205 if queued > threshold {
206 self.runtime
207 .observability
208 .observe_runtime(RuntimeEvent::QueueWarning { queued, threshold });
209 self.runtime
210 .observability
211 .observe_telemetry(TelemetryEvent::QueueWarning { queued, threshold });
212 }
213 }
214 }
215
216 fn drain_queue(&mut self, cx: &mut Context<'_, Self>) {
217 if self.runtime.queue.is_draining {
218 return;
219 }
220
221 self.runtime.queue.is_draining = true;
222 let queued = self.runtime.queue.pending.len();
223 self.runtime
224 .observability
225 .observe_runtime(RuntimeEvent::QueueDrainStarted { queued });
226 self.runtime
227 .observability
228 .observe_telemetry(TelemetryEvent::QueueDrainStarted { queued });
229
230 let mut processed = 0;
231
232 while let Some(queued) = self.runtime.queue.pending.pop_front() {
233 if self.runtime.queue_tracker.take_if_dropped(queued.id) {
234 continue;
235 }
236
237 self.runtime.queue_tracker.complete_processed(queued.id);
238 let message_description = self
239 .runtime
240 .observability
241 .describe_message_value(&queued.message);
242 self.runtime
243 .observability
244 .observe_runtime(RuntimeEvent::MessageProcessed {
245 message: &queued.message,
246 message_description: message_description.clone(),
247 });
248 self.runtime
249 .observability
250 .observe_telemetry(TelemetryEvent::MessageProcessed {
251 message: &queued.message,
252 message_description,
253 });
254
255 let command = self.model.update(queued.message, cx, &ModelContext::root());
256 processed += 1;
257 self.execute_command(command, cx);
258 }
259
260 self.runtime.queue.is_draining = false;
261
262 if processed > 0 {
263 self.reconcile_subscriptions(cx);
264 cx.notify();
265 }
266
267 let remaining = self.runtime.queue.pending.len();
268 self.runtime
269 .observability
270 .observe_runtime(RuntimeEvent::QueueDrainFinished {
271 processed,
272 remaining,
273 });
274 self.runtime
275 .observability
276 .observe_telemetry(TelemetryEvent::QueueDrainFinished {
277 processed,
278 remaining,
279 });
280 }
281
282 fn execute_command(&mut self, command: Command<M::Msg>, cx: &mut Context<'_, Self>) {
283 let (label, inner) = command.into_parts();
284
285 match inner {
286 CommandInner::None => {}
287 CommandInner::Emit(message) => {
288 let meta = CommandMeta::new(CommandKind::Emit, label);
289 self.observe_command_scheduled(&meta, None);
290 self.observe_effect_completed(&meta, None, Some(&message));
291 self.enqueue_message(message, cx);
292 }
293 CommandInner::Batch(commands) => {
294 for command in commands {
295 self.execute_command(command, cx);
296 }
297 }
298 CommandInner::Effect(effect) => {
299 let meta = CommandMeta::new(effect.kind(), label);
300 self.observe_command_scheduled(&meta, None);
301 self.observe_effect_started(&meta, None);
302 Self::spawn_effect(effect, meta, None, cx).detach();
303 }
304 CommandInner::Keyed { key, effect } => {
305 let meta = CommandMeta::new(effect.kind(), label);
306 self.observe_command_scheduled(&meta, Some(&key));
307 self.observe_effect_started(&meta, Some(&key));
308 let generation = self.runtime.tasks.next_generation();
309 let keyed = KeyedEffectContext {
310 key: key.clone(),
311 generation,
312 };
313 let task = Self::spawn_effect(effect, meta.clone(), Some(keyed), cx);
314 let previous =
315 self.runtime
316 .tasks
317 .insert(key.clone(), generation, meta.clone(), task);
318
319 if let Some(previous) = previous {
320 let previous_kind = previous.meta.kind;
321 let previous_label = previous.meta.label().map(ToOwned::to_owned);
322 previous.task.detach();
323 self.runtime.observability.observe_runtime(
324 RuntimeEvent::KeyedCommandReplaced {
325 key: &key,
326 key_description: self.runtime.observability.describe_key_value(&key),
327 previous_kind,
328 previous_label: previous_label.as_deref(),
329 next_kind: meta.kind,
330 next_label: meta.label(),
331 },
332 );
333 self.runtime.observability.observe_telemetry(
334 TelemetryEvent::KeyedCommandReplaced {
335 key: &key,
336 key_description: self.runtime.observability.describe_key_value(&key),
337 previous_kind,
338 previous_label: previous_label.as_deref(),
339 next_kind: meta.kind,
340 next_label: meta.label(),
341 },
342 );
343 }
344 }
345 CommandInner::Cancel(key) => {
346 self.cancel_keyed_command(&key);
347 }
348 }
349 }
350
351 fn cancel_keyed_command(&mut self, key: &Key) {
352 if let Some(running) = self.runtime.tasks.cancel(key) {
353 let canceled_kind = running.meta.kind;
354 let canceled_label = running.meta.label().map(ToOwned::to_owned);
355 running.task.detach();
356 self.runtime
357 .observability
358 .observe_telemetry(TelemetryEvent::KeyedCommandCanceled {
359 key,
360 key_description: self.runtime.observability.describe_key_value(key),
361 canceled_kind,
362 canceled_label: canceled_label.as_deref(),
363 });
364 }
365 }
366
367 fn observe_command_scheduled(&self, meta: &CommandMeta, key: Option<&Key>) {
368 self.runtime
369 .observability
370 .observe_runtime(RuntimeEvent::CommandScheduled {
371 kind: meta.kind,
372 label: meta.label(),
373 key,
374 key_description: key
375 .and_then(|key| self.runtime.observability.describe_key_value(key)),
376 });
377 self.runtime
378 .observability
379 .observe_telemetry(TelemetryEvent::CommandScheduled {
380 kind: meta.kind,
381 label: meta.label(),
382 key,
383 key_description: key
384 .and_then(|key| self.runtime.observability.describe_key_value(key)),
385 });
386 }
387
388 fn observe_effect_started(&self, meta: &CommandMeta, key: Option<&Key>) {
389 self.runtime
390 .observability
391 .observe_telemetry(TelemetryEvent::EffectStarted {
392 kind: meta.kind,
393 label: meta.label(),
394 key,
395 key_description: key
396 .and_then(|key| self.runtime.observability.describe_key_value(key)),
397 });
398 }
399
400 fn observe_effect_completed(
401 &self,
402 meta: &CommandMeta,
403 key: Option<&Key>,
404 message: Option<&M::Msg>,
405 ) {
406 let key_description =
407 key.and_then(|key| self.runtime.observability.describe_key_value(key));
408 let message_description =
409 message.and_then(|message| self.runtime.observability.describe_message_value(message));
410
411 self.runtime
412 .observability
413 .observe_runtime(RuntimeEvent::EffectCompleted {
414 kind: meta.kind,
415 label: meta.label(),
416 key,
417 key_description: key_description.clone(),
418 emitted_message: message.is_some(),
419 message,
420 message_description: message_description.clone(),
421 });
422 self.runtime
423 .observability
424 .observe_telemetry(TelemetryEvent::EffectCompleted {
425 kind: meta.kind,
426 label: meta.label(),
427 key,
428 key_description,
429 emitted_message: message.is_some(),
430 message,
431 message_description,
432 });
433 }
434
435 fn apply_completion(
436 &mut self,
437 message: Option<M::Msg>,
438 meta: &CommandMeta,
439 keyed: Option<KeyedEffectContext>,
440 cx: &mut Context<'_, Self>,
441 ) {
442 let key = keyed.as_ref().map(|keyed| &keyed.key);
443 self.observe_effect_completed(meta, key, message.as_ref());
444
445 if let Some(keyed) = keyed {
446 if self.runtime.tasks.is_current(&keyed.key, keyed.generation) {
447 self.runtime
448 .tasks
449 .clear_current(&keyed.key, keyed.generation);
450 if let Some(message) = message {
451 self.enqueue_message(message, cx);
452 }
453 } else {
454 let key_description = self.runtime.observability.describe_key_value(&keyed.key);
455 let message_description = message
456 .as_ref()
457 .and_then(|message| self.runtime.observability.describe_message_value(message));
458
459 self.runtime.observability.observe_runtime(
460 RuntimeEvent::StaleKeyedCompletionIgnored {
461 kind: meta.kind,
462 label: meta.label(),
463 key: &keyed.key,
464 key_description: key_description.clone(),
465 emitted_message: message.is_some(),
466 message: message.as_ref(),
467 message_description: message_description.clone(),
468 },
469 );
470 self.runtime.observability.observe_telemetry(
471 TelemetryEvent::StaleKeyedCompletionIgnored {
472 kind: meta.kind,
473 label: meta.label(),
474 key: &keyed.key,
475 key_description,
476 emitted_message: message.is_some(),
477 message: message.as_ref(),
478 message_description,
479 },
480 );
481 }
482 } else if let Some(message) = message {
483 self.enqueue_message(message, cx);
484 }
485 }
486
487 fn spawn_effect(
488 effect: Effect<M::Msg>,
489 meta: CommandMeta,
490 keyed: Option<KeyedEffectContext>,
491 cx: &mut Context<'_, Self>,
492 ) -> Task<()> {
493 let program = cx.weak_entity();
494
495 match effect {
496 Effect::Foreground(effect) => cx.spawn(move |_this, async_cx: &mut AsyncApp| {
497 let mut async_cx = async_cx.clone();
498 async move {
499 let message = effect(&mut async_cx).await;
500 program
501 .update(&mut async_cx, |program, cx| {
502 program.apply_completion(message, &meta, keyed, cx);
503 })
504 .ok();
505 }
506 }),
507 Effect::Background(effect) => {
508 let executor = cx.background_executor().clone();
509 cx.spawn(move |_this, async_cx: &mut AsyncApp| {
510 let mut async_cx = async_cx.clone();
511 async move {
512 let spawned_executor = executor.clone();
513 let message = executor.spawn(effect(spawned_executor)).await;
514 program
515 .update(&mut async_cx, |program, cx| {
516 program.apply_completion(message, &meta, keyed, cx);
517 })
518 .ok();
519 }
520 })
521 }
522 }
523 }
524
525 fn reconcile_subscriptions(&mut self, cx: &mut Context<'_, Self>) {
526 let subscriptions = self.model.subscriptions(cx, &ModelContext::root());
527 let dispatcher = self.dispatcher();
528 let stats = self.runtime.subscriptions.reconcile(
529 subscriptions,
530 &dispatcher,
531 &self.runtime.observability,
532 cx,
533 );
534
535 self.runtime
536 .observability
537 .observe_runtime(RuntimeEvent::SubscriptionsReconciled {
538 active: stats.active,
539 added: stats.added,
540 removed: stats.removed,
541 retained: stats.retained,
542 });
543 self.runtime
544 .observability
545 .observe_telemetry(TelemetryEvent::SubscriptionsReconciled {
546 active: stats.active,
547 added: stats.added,
548 removed: stats.removed,
549 retained: stats.retained,
550 });
551 }
552}
553
554impl<M: Model> Render for Program<M> {
555 fn render(&mut self, window: &mut Window, cx: &mut Context<'_, Self>) -> impl IntoElement {
556 let dispatcher = self.dispatcher();
557 self.model
558 .view(window, cx, &ModelContext::root(), &dispatcher)
559 }
560}
561
562#[cfg(test)]
563mod runtime_tests {
564 use super::*;
565 use crate::{IntoView, ModelExt, QueuePolicy, View};
566 use futures::channel::oneshot;
567 use gpui::{Entity, Task, TestAppContext, Window, div};
568 use std::collections::VecDeque;
569 use std::sync::{Arc, Mutex};
570
571 #[derive(Clone, Debug, PartialEq)]
572 enum Msg {
573 Set(i32),
574 }
575
576 struct TestModel {
577 state: i32,
578 }
579
580 impl Model for TestModel {
581 type Msg = Msg;
582
583 fn update(
584 &mut self,
585 msg: Self::Msg,
586 _cx: &mut App,
587 _scope: &ModelContext<Self::Msg>,
588 ) -> Command<Self::Msg> {
589 match msg {
590 Msg::Set(value) => {
591 self.state = value;
592 Command::none()
593 }
594 }
595 }
596
597 fn view(
598 &self,
599 _window: &mut Window,
600 _cx: &mut App,
601 _scope: &ModelContext<Self::Msg>,
602 _dispatcher: &Dispatcher<Self::Msg>,
603 ) -> View {
604 div().into_view()
605 }
606 }
607
608 #[gpui::test]
609 fn stale_keyed_completion_is_ignored(cx: &mut TestAppContext) {
610 let events = Arc::new(Mutex::new(Vec::new()));
611 let config = ProgramConfig::default()
612 .describe_message(|msg: &Msg| match msg {
613 Msg::Set(value) => format!("set:{value}"),
614 })
615 .describe_key(|key| format!("{key:?}"))
616 .observer({
617 let events = events.clone();
618 move |event: RuntimeEvent<'_, Msg>| {
619 if let RuntimeEvent::StaleKeyedCompletionIgnored {
620 message_description,
621 ..
622 } = event
623 {
624 events
625 .lock()
626 .unwrap()
627 .push(message_description.map(|value| value.to_string()));
628 }
629 }
630 });
631
632 let program: Entity<Program<TestModel>> =
633 cx.update(|cx| TestModel { state: 0 }.into_program_with(config, cx));
634 let key = Key::new("stale");
635
636 program.update(cx, |program: &mut Program<TestModel>, cx| {
637 let new_generation = program.runtime.tasks.next_generation();
638 program.runtime.tasks.insert(
639 key.clone(),
640 new_generation,
641 CommandMeta::new(CommandKind::Foreground, Some(Arc::from("current"))),
642 Task::ready(()),
643 );
644
645 let stale_meta = CommandMeta::new(CommandKind::Foreground, Some(Arc::from("stale")));
646 program.apply_completion(
647 Some(Msg::Set(7)),
648 &stale_meta,
649 Some(KeyedEffectContext {
650 key: key.clone(),
651 generation: new_generation.previous(),
652 }),
653 cx,
654 );
655 });
656
657 program.read_with(cx, |program: &Program<TestModel>, _cx| {
658 assert_eq!(program.model().state, 0);
659 });
660 assert_eq!(
661 events.lock().unwrap().as_slice(),
662 &[Some(String::from("set:7"))]
663 );
664 }
665
666 #[gpui::test]
667 fn runtime_snapshot_reports_current_counts_without_side_effects(cx: &mut TestAppContext) {
668 let program: Entity<Program<TestModel>> =
669 cx.update(|cx| TestModel { state: 0 }.into_program(cx));
670
671 let before = program.read_with(cx, |program, _cx| program.runtime_snapshot());
672 let after = program.read_with(cx, |program, _cx| program.runtime_snapshot());
673
674 assert_eq!(before, after);
675 assert_eq!(before.queue_depth, 0);
676 assert!(!before.is_draining);
677 assert_eq!(before.active_keyed_tasks, 0);
678 assert_eq!(before.active_subscriptions, 0);
679 }
680
681 #[gpui::test]
682 fn internal_enqueue_respects_reject_new_policy_without_extra_notify(cx: &mut TestAppContext) {
683 struct RejectModel {
684 renders: Arc<Mutex<usize>>,
685 }
686
687 impl Model for RejectModel {
688 type Msg = Msg;
689
690 fn update(
691 &mut self,
692 msg: Self::Msg,
693 _cx: &mut App,
694 _scope: &ModelContext<Self::Msg>,
695 ) -> Command<Self::Msg> {
696 match msg {
697 Msg::Set(0) => {
698 Command::batch([Command::emit(Msg::Set(1)), Command::emit(Msg::Set(2))])
699 }
700 Msg::Set(_) => Command::none(),
701 }
702 }
703
704 fn view(
705 &self,
706 _window: &mut Window,
707 _cx: &mut App,
708 _scope: &ModelContext<Self::Msg>,
709 _dispatcher: &Dispatcher<Self::Msg>,
710 ) -> View {
711 *self.renders.lock().unwrap() += 1;
712 div().into_view()
713 }
714 }
715
716 let renders = Arc::new(Mutex::new(0));
717 let program: Entity<Program<RejectModel>> = cx.update(|cx| {
718 RejectModel {
719 renders: renders.clone(),
720 }
721 .into_program_with(
722 ProgramConfig::default().queue_policy(QueuePolicy::RejectNew { capacity: 1 }),
723 cx,
724 )
725 });
726 let dispatcher = program.read_with(cx, |program, _cx| program.dispatcher());
727
728 dispatcher.dispatch(Msg::Set(0)).unwrap();
729 cx.run_until_parked();
730
731 let snapshot = program.read_with(cx, |program, _cx| program.runtime_snapshot());
732 assert_eq!(snapshot.queue_depth, 0);
733 }
734
735 #[gpui::test]
736 fn cancel_key_clears_tracked_task_and_emits_telemetry(cx: &mut TestAppContext) {
737 #[derive(Clone, Debug, PartialEq, Eq)]
738 enum CancelMsg {
739 RunNext,
740 Loaded(i32),
741 }
742
743 #[derive(Clone, Debug, PartialEq, Eq)]
744 enum CancelEvent {
745 Canceled(Option<String>),
746 Stale(Option<String>),
747 }
748
749 struct CancelModel {
750 commands: VecDeque<Command<CancelMsg>>,
751 values: Vec<i32>,
752 }
753
754 impl Model for CancelModel {
755 type Msg = CancelMsg;
756
757 fn update(
758 &mut self,
759 msg: Self::Msg,
760 _cx: &mut App,
761 _scope: &ModelContext<Self::Msg>,
762 ) -> Command<Self::Msg> {
763 match msg {
764 CancelMsg::RunNext => self.commands.pop_front().unwrap_or_else(Command::none),
765 CancelMsg::Loaded(value) => {
766 self.values.push(value);
767 Command::none()
768 }
769 }
770 }
771
772 fn view(
773 &self,
774 _window: &mut Window,
775 _cx: &mut App,
776 _scope: &ModelContext<Self::Msg>,
777 _dispatcher: &Dispatcher<Self::Msg>,
778 ) -> View {
779 div().into_view()
780 }
781 }
782
783 let events = Arc::new(Mutex::new(Vec::new()));
784 let (sender, receiver) = oneshot::channel();
785 let config = ProgramConfig::default()
786 .describe_message(|msg: &CancelMsg| match msg {
787 CancelMsg::RunNext => String::from("run-next"),
788 CancelMsg::Loaded(value) => format!("loaded:{value}"),
789 })
790 .describe_key(|key| format!("{key:?}"))
791 .telemetry_observer({
792 let events = events.clone();
793 move |envelope| match envelope.event {
794 TelemetryEvent::KeyedCommandCanceled {
795 key_description, ..
796 } => events.lock().unwrap().push(CancelEvent::Canceled(
797 key_description.map(|value| value.to_string()),
798 )),
799 TelemetryEvent::StaleKeyedCompletionIgnored {
800 message_description,
801 ..
802 } => events.lock().unwrap().push(CancelEvent::Stale(
803 message_description.map(|value| value.to_string()),
804 )),
805 _ => {}
806 }
807 });
808
809 let program: Entity<Program<CancelModel>> = cx.update(|cx| {
810 CancelModel {
811 commands: VecDeque::from([
812 Command::background_keyed("load", move |_| async move {
813 receiver.await.ok().map(CancelMsg::Loaded)
814 })
815 .label("load"),
816 Command::cancel_key("load"),
817 ]),
818 values: Vec::new(),
819 }
820 .into_program_with(config, cx)
821 });
822 let dispatcher = program.read_with(cx, |program, _cx| program.dispatcher());
823
824 dispatcher.dispatch(CancelMsg::RunNext).unwrap();
825 cx.run_until_parked();
826 dispatcher.dispatch(CancelMsg::RunNext).unwrap();
827 sender.send(8).unwrap();
828 cx.run_until_parked();
829
830 program.read_with(cx, |program, _cx| {
831 assert!(program.model().values.is_empty());
832 assert_eq!(program.runtime_snapshot().active_keyed_tasks, 0);
833 });
834 assert_eq!(
835 events.lock().unwrap().as_slice(),
836 &[
837 CancelEvent::Canceled(Some(String::from("Key(\"load\")"))),
838 CancelEvent::Stale(Some(String::from("loaded:8"))),
839 ]
840 );
841 }
842}