Skip to main content

gpui_tea/
program.rs

1use crate::command::{CommandInner, Effect};
2use crate::keyed_tasks::{CommandMeta, KeyedEffectContext};
3use crate::model::Model;
4use crate::observability::{
5    Observability, ProgramConfig, QueueOverflowAction, RuntimeEvent, TelemetryEvent,
6};
7use crate::queue::{QueueReservation, QueueTracker, QueuedMessage};
8use crate::runtime::Runtime;
9use crate::{Command, CommandKind, Dispatcher, Key, ModelContext};
10use futures::StreamExt;
11use gpui::{App, AppContext, AsyncApp, Context, Entity, IntoElement, Render, Task, Window};
12use std::{fmt, sync::Arc};
13
14#[derive(Clone, Copy, Debug, PartialEq, Eq)]
15/// Snapshot runtime bookkeeping for a mounted [`Program`].
16pub struct RuntimeSnapshot {
17    /// Report the current queue depth.
18    pub queue_depth: usize,
19    /// Report whether the runtime is actively draining queued messages.
20    pub is_draining: bool,
21    /// Report the number of tracked keyed effects.
22    pub active_keyed_tasks: usize,
23    /// Report the number of active subscriptions.
24    pub active_subscriptions: usize,
25}
26
27/// Host a mounted [`Model`] inside the GPUI runtime.
28pub struct Program<M: Model> {
29    model: M,
30    runtime: Runtime<M::Msg>,
31}
32
33impl<M> fmt::Debug for Program<M>
34where
35    M: Model + fmt::Debug,
36{
37    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
38        formatter
39            .debug_struct("Program")
40            .field("model", &self.model)
41            .finish_non_exhaustive()
42    }
43}
44
45impl<M: Model> Program<M> {
46    fn build_runtime(config: ProgramConfig<M::Msg>, cx: &mut Context<'_, Self>) -> Runtime<M::Msg> {
47        let queue_policy = config.queue_policy;
48        let queue_tracker = Arc::new(QueueTracker::new(queue_policy));
49        let observability = Observability::new(
50            config,
51            Arc::new({
52                let queue_tracker = queue_tracker.clone();
53                move || queue_tracker.depth()
54            }),
55        );
56        let (sender, mut receiver) = futures::channel::mpsc::unbounded();
57        let dispatcher = Dispatcher::new(sender, queue_tracker.clone(), observability.clone());
58        let program = cx.weak_entity();
59        let receive_task = cx.spawn(move |_this, async_cx: &mut AsyncApp| {
60            let mut async_cx = async_cx.clone();
61            async move {
62                while let Some(message) = receiver.next().await {
63                    program
64                        .update(&mut async_cx, |program, cx| {
65                            program.receive_enqueued(message, cx);
66                        })
67                        .ok();
68                }
69            }
70        });
71
72        Runtime::new(dispatcher, queue_tracker, receive_task, observability)
73    }
74
75    /// Mount `model` with the default [`ProgramConfig`].
76    ///
77    /// # Arguments
78    ///
79    /// * `model` - The initial model state.
80    /// * `cx` - The GPUI application context.
81    ///
82    /// Mounting eagerly calls [`Model::init()`], executes the returned [`Command`], and
83    /// reconciles the initial subscriptions before returning.
84    ///
85    /// # Returns
86    ///
87    /// An [`Entity`] holding the mounted program.
88    pub fn mount(model: M, cx: &mut App) -> Entity<Self> {
89        Self::mount_with(model, ProgramConfig::default(), cx)
90    }
91
92    /// Mount `model` with an explicit [`ProgramConfig`].
93    ///
94    /// # Arguments
95    ///
96    /// * `model` - The initial model state.
97    /// * `config` - The configuration to use for the program runtime.
98    /// * `cx` - The GPUI application context.
99    ///
100    /// Mounting eagerly calls [`Model::init()`], executes the returned [`Command`], and
101    /// reconciles the initial subscriptions before returning.
102    ///
103    /// # Returns
104    ///
105    /// An [`Entity`] holding the mounted program.
106    pub fn mount_with(model: M, config: ProgramConfig<M::Msg>, cx: &mut App) -> Entity<Self> {
107        cx.new(|cx| {
108            let runtime = Self::build_runtime(config, cx);
109            let mut program = Self { model, runtime };
110            let init_command = program.model.init(cx, &ModelContext::root());
111            program.execute_command(init_command, cx);
112            program.reconcile_subscriptions(cx);
113            program
114        })
115    }
116
117    /// Borrow the mounted model state.
118    ///
119    /// # Returns
120    ///
121    /// A reference to the underlying model.
122    pub fn model(&self) -> &M {
123        &self.model
124    }
125
126    /// Capture the current runtime bookkeeping state.
127    ///
128    /// # Returns
129    ///
130    /// A snapshot of the program's runtime.
131    pub fn runtime_snapshot(&self) -> RuntimeSnapshot {
132        RuntimeSnapshot {
133            queue_depth: self.runtime.queue_tracker.depth(),
134            is_draining: self.runtime.queue.is_draining,
135            active_keyed_tasks: self.runtime.tasks.len(),
136            active_subscriptions: self.runtime.subscriptions.len(),
137        }
138    }
139
140    /// Clone the dispatcher associated with this program.
141    ///
142    /// # Returns
143    ///
144    /// A cloned [`Dispatcher`] that sends messages to this program.
145    pub fn dispatcher(&self) -> Dispatcher<M::Msg> {
146        self.runtime.dispatcher.clone()
147    }
148
149    fn receive_enqueued(&mut self, queued: QueuedMessage<M::Msg>, cx: &mut Context<'_, Self>) {
150        if self.runtime.queue_tracker.take_if_dropped(queued.id) {
151            return;
152        }
153
154        self.runtime.queue.pending.push_back(queued);
155        self.observe_queue_warning_if_needed();
156        self.drain_queue(cx);
157    }
158
159    fn enqueue_message(&mut self, message: M::Msg, cx: &mut Context<'_, Self>) {
160        let message_description = self.runtime.observability.describe_message_value(&message);
161        let reservation = self.runtime.queue_tracker.reserve_enqueue();
162
163        match reservation {
164            QueueReservation::Rejected => {
165                self.runtime
166                    .observability
167                    .observe_telemetry(TelemetryEvent::QueueOverflow {
168                        policy: self.runtime.observability.queue_policy(),
169                        action: QueueOverflowAction::RejectedNew,
170                        message_description,
171                    });
172            }
173            QueueReservation::DroppedNewest => {
174                self.runtime
175                    .observability
176                    .observe_telemetry(TelemetryEvent::QueueOverflow {
177                        policy: self.runtime.observability.queue_policy(),
178                        action: QueueOverflowAction::DroppedNewest,
179                        message_description,
180                    });
181            }
182            QueueReservation::Accepted {
183                id,
184                overflow_action,
185                ..
186            } => {
187                if let Some(action) = overflow_action {
188                    self.runtime
189                        .observability
190                        .observe_telemetry(TelemetryEvent::QueueOverflow {
191                            policy: self.runtime.observability.queue_policy(),
192                            action,
193                            message_description: message_description.clone(),
194                        });
195                }
196
197                self.receive_enqueued(QueuedMessage { id, message }, cx);
198            }
199        }
200    }
201
202    fn observe_queue_warning_if_needed(&self) {
203        if let Some(threshold) = self.runtime.observability.queue_warning_threshold() {
204            let queued = self.runtime.queue_tracker.depth();
205            if queued > threshold {
206                self.runtime
207                    .observability
208                    .observe_runtime(RuntimeEvent::QueueWarning { queued, threshold });
209                self.runtime
210                    .observability
211                    .observe_telemetry(TelemetryEvent::QueueWarning { queued, threshold });
212            }
213        }
214    }
215
216    fn drain_queue(&mut self, cx: &mut Context<'_, Self>) {
217        if self.runtime.queue.is_draining {
218            return;
219        }
220
221        self.runtime.queue.is_draining = true;
222        let queued = self.runtime.queue.pending.len();
223        self.runtime
224            .observability
225            .observe_runtime(RuntimeEvent::QueueDrainStarted { queued });
226        self.runtime
227            .observability
228            .observe_telemetry(TelemetryEvent::QueueDrainStarted { queued });
229
230        let mut processed = 0;
231
232        while let Some(queued) = self.runtime.queue.pending.pop_front() {
233            if self.runtime.queue_tracker.take_if_dropped(queued.id) {
234                continue;
235            }
236
237            self.runtime.queue_tracker.complete_processed(queued.id);
238            let message_description = self
239                .runtime
240                .observability
241                .describe_message_value(&queued.message);
242            self.runtime
243                .observability
244                .observe_runtime(RuntimeEvent::MessageProcessed {
245                    message: &queued.message,
246                    message_description: message_description.clone(),
247                });
248            self.runtime
249                .observability
250                .observe_telemetry(TelemetryEvent::MessageProcessed {
251                    message: &queued.message,
252                    message_description,
253                });
254
255            let command = self.model.update(queued.message, cx, &ModelContext::root());
256            processed += 1;
257            self.execute_command(command, cx);
258        }
259
260        self.runtime.queue.is_draining = false;
261
262        if processed > 0 {
263            self.reconcile_subscriptions(cx);
264            cx.notify();
265        }
266
267        let remaining = self.runtime.queue.pending.len();
268        self.runtime
269            .observability
270            .observe_runtime(RuntimeEvent::QueueDrainFinished {
271                processed,
272                remaining,
273            });
274        self.runtime
275            .observability
276            .observe_telemetry(TelemetryEvent::QueueDrainFinished {
277                processed,
278                remaining,
279            });
280    }
281
282    fn execute_command(&mut self, command: Command<M::Msg>, cx: &mut Context<'_, Self>) {
283        let (label, inner) = command.into_parts();
284
285        match inner {
286            CommandInner::None => {}
287            CommandInner::Emit(message) => {
288                let meta = CommandMeta::new(CommandKind::Emit, label);
289                self.observe_command_scheduled(&meta, None);
290                self.observe_effect_completed(&meta, None, Some(&message));
291                self.enqueue_message(message, cx);
292            }
293            CommandInner::Batch(commands) => {
294                for command in commands {
295                    self.execute_command(command, cx);
296                }
297            }
298            CommandInner::Effect(effect) => {
299                let meta = CommandMeta::new(effect.kind(), label);
300                self.observe_command_scheduled(&meta, None);
301                self.observe_effect_started(&meta, None);
302                Self::spawn_effect(effect, meta, None, cx).detach();
303            }
304            CommandInner::Keyed { key, effect } => {
305                let meta = CommandMeta::new(effect.kind(), label);
306                self.observe_command_scheduled(&meta, Some(&key));
307                self.observe_effect_started(&meta, Some(&key));
308                let generation = self.runtime.tasks.next_generation();
309                let keyed = KeyedEffectContext {
310                    key: key.clone(),
311                    generation,
312                };
313                let task = Self::spawn_effect(effect, meta.clone(), Some(keyed), cx);
314                let previous =
315                    self.runtime
316                        .tasks
317                        .insert(key.clone(), generation, meta.clone(), task);
318
319                if let Some(previous) = previous {
320                    let previous_kind = previous.meta.kind;
321                    let previous_label = previous.meta.label().map(ToOwned::to_owned);
322                    previous.task.detach();
323                    self.runtime.observability.observe_runtime(
324                        RuntimeEvent::KeyedCommandReplaced {
325                            key: &key,
326                            key_description: self.runtime.observability.describe_key_value(&key),
327                            previous_kind,
328                            previous_label: previous_label.as_deref(),
329                            next_kind: meta.kind,
330                            next_label: meta.label(),
331                        },
332                    );
333                    self.runtime.observability.observe_telemetry(
334                        TelemetryEvent::KeyedCommandReplaced {
335                            key: &key,
336                            key_description: self.runtime.observability.describe_key_value(&key),
337                            previous_kind,
338                            previous_label: previous_label.as_deref(),
339                            next_kind: meta.kind,
340                            next_label: meta.label(),
341                        },
342                    );
343                }
344            }
345            CommandInner::Cancel(key) => {
346                self.cancel_keyed_command(&key);
347            }
348        }
349    }
350
351    fn cancel_keyed_command(&mut self, key: &Key) {
352        if let Some(running) = self.runtime.tasks.cancel(key) {
353            let canceled_kind = running.meta.kind;
354            let canceled_label = running.meta.label().map(ToOwned::to_owned);
355            running.task.detach();
356            self.runtime
357                .observability
358                .observe_telemetry(TelemetryEvent::KeyedCommandCanceled {
359                    key,
360                    key_description: self.runtime.observability.describe_key_value(key),
361                    canceled_kind,
362                    canceled_label: canceled_label.as_deref(),
363                });
364        }
365    }
366
367    fn observe_command_scheduled(&self, meta: &CommandMeta, key: Option<&Key>) {
368        self.runtime
369            .observability
370            .observe_runtime(RuntimeEvent::CommandScheduled {
371                kind: meta.kind,
372                label: meta.label(),
373                key,
374                key_description: key
375                    .and_then(|key| self.runtime.observability.describe_key_value(key)),
376            });
377        self.runtime
378            .observability
379            .observe_telemetry(TelemetryEvent::CommandScheduled {
380                kind: meta.kind,
381                label: meta.label(),
382                key,
383                key_description: key
384                    .and_then(|key| self.runtime.observability.describe_key_value(key)),
385            });
386    }
387
388    fn observe_effect_started(&self, meta: &CommandMeta, key: Option<&Key>) {
389        self.runtime
390            .observability
391            .observe_telemetry(TelemetryEvent::EffectStarted {
392                kind: meta.kind,
393                label: meta.label(),
394                key,
395                key_description: key
396                    .and_then(|key| self.runtime.observability.describe_key_value(key)),
397            });
398    }
399
400    fn observe_effect_completed(
401        &self,
402        meta: &CommandMeta,
403        key: Option<&Key>,
404        message: Option<&M::Msg>,
405    ) {
406        let key_description =
407            key.and_then(|key| self.runtime.observability.describe_key_value(key));
408        let message_description =
409            message.and_then(|message| self.runtime.observability.describe_message_value(message));
410
411        self.runtime
412            .observability
413            .observe_runtime(RuntimeEvent::EffectCompleted {
414                kind: meta.kind,
415                label: meta.label(),
416                key,
417                key_description: key_description.clone(),
418                emitted_message: message.is_some(),
419                message,
420                message_description: message_description.clone(),
421            });
422        self.runtime
423            .observability
424            .observe_telemetry(TelemetryEvent::EffectCompleted {
425                kind: meta.kind,
426                label: meta.label(),
427                key,
428                key_description,
429                emitted_message: message.is_some(),
430                message,
431                message_description,
432            });
433    }
434
435    fn apply_completion(
436        &mut self,
437        message: Option<M::Msg>,
438        meta: &CommandMeta,
439        keyed: Option<KeyedEffectContext>,
440        cx: &mut Context<'_, Self>,
441    ) {
442        let key = keyed.as_ref().map(|keyed| &keyed.key);
443        self.observe_effect_completed(meta, key, message.as_ref());
444
445        if let Some(keyed) = keyed {
446            if self.runtime.tasks.is_current(&keyed.key, keyed.generation) {
447                self.runtime
448                    .tasks
449                    .clear_current(&keyed.key, keyed.generation);
450                if let Some(message) = message {
451                    self.enqueue_message(message, cx);
452                }
453            } else {
454                let key_description = self.runtime.observability.describe_key_value(&keyed.key);
455                let message_description = message
456                    .as_ref()
457                    .and_then(|message| self.runtime.observability.describe_message_value(message));
458
459                self.runtime.observability.observe_runtime(
460                    RuntimeEvent::StaleKeyedCompletionIgnored {
461                        kind: meta.kind,
462                        label: meta.label(),
463                        key: &keyed.key,
464                        key_description: key_description.clone(),
465                        emitted_message: message.is_some(),
466                        message: message.as_ref(),
467                        message_description: message_description.clone(),
468                    },
469                );
470                self.runtime.observability.observe_telemetry(
471                    TelemetryEvent::StaleKeyedCompletionIgnored {
472                        kind: meta.kind,
473                        label: meta.label(),
474                        key: &keyed.key,
475                        key_description,
476                        emitted_message: message.is_some(),
477                        message: message.as_ref(),
478                        message_description,
479                    },
480                );
481            }
482        } else if let Some(message) = message {
483            self.enqueue_message(message, cx);
484        }
485    }
486
487    fn spawn_effect(
488        effect: Effect<M::Msg>,
489        meta: CommandMeta,
490        keyed: Option<KeyedEffectContext>,
491        cx: &mut Context<'_, Self>,
492    ) -> Task<()> {
493        let program = cx.weak_entity();
494
495        match effect {
496            Effect::Foreground(effect) => cx.spawn(move |_this, async_cx: &mut AsyncApp| {
497                let mut async_cx = async_cx.clone();
498                async move {
499                    let message = effect(&mut async_cx).await;
500                    program
501                        .update(&mut async_cx, |program, cx| {
502                            program.apply_completion(message, &meta, keyed, cx);
503                        })
504                        .ok();
505                }
506            }),
507            Effect::Background(effect) => {
508                let executor = cx.background_executor().clone();
509                cx.spawn(move |_this, async_cx: &mut AsyncApp| {
510                    let mut async_cx = async_cx.clone();
511                    async move {
512                        let spawned_executor = executor.clone();
513                        let message = executor.spawn(effect(spawned_executor)).await;
514                        program
515                            .update(&mut async_cx, |program, cx| {
516                                program.apply_completion(message, &meta, keyed, cx);
517                            })
518                            .ok();
519                    }
520                })
521            }
522        }
523    }
524
525    fn reconcile_subscriptions(&mut self, cx: &mut Context<'_, Self>) {
526        let subscriptions = self.model.subscriptions(cx, &ModelContext::root());
527        let dispatcher = self.dispatcher();
528        let stats = self.runtime.subscriptions.reconcile(
529            subscriptions,
530            &dispatcher,
531            &self.runtime.observability,
532            cx,
533        );
534
535        self.runtime
536            .observability
537            .observe_runtime(RuntimeEvent::SubscriptionsReconciled {
538                active: stats.active,
539                added: stats.added,
540                removed: stats.removed,
541                retained: stats.retained,
542            });
543        self.runtime
544            .observability
545            .observe_telemetry(TelemetryEvent::SubscriptionsReconciled {
546                active: stats.active,
547                added: stats.added,
548                removed: stats.removed,
549                retained: stats.retained,
550            });
551    }
552}
553
554impl<M: Model> Render for Program<M> {
555    fn render(&mut self, window: &mut Window, cx: &mut Context<'_, Self>) -> impl IntoElement {
556        let dispatcher = self.dispatcher();
557        self.model
558            .view(window, cx, &ModelContext::root(), &dispatcher)
559    }
560}
561
562#[cfg(test)]
563mod runtime_tests {
564    use super::*;
565    use crate::{IntoView, ModelExt, QueuePolicy, View};
566    use futures::channel::oneshot;
567    use gpui::{Entity, Task, TestAppContext, Window, div};
568    use std::collections::VecDeque;
569    use std::sync::{Arc, Mutex};
570
571    #[derive(Clone, Debug, PartialEq)]
572    enum Msg {
573        Set(i32),
574    }
575
576    struct TestModel {
577        state: i32,
578    }
579
580    impl Model for TestModel {
581        type Msg = Msg;
582
583        fn update(
584            &mut self,
585            msg: Self::Msg,
586            _cx: &mut App,
587            _scope: &ModelContext<Self::Msg>,
588        ) -> Command<Self::Msg> {
589            match msg {
590                Msg::Set(value) => {
591                    self.state = value;
592                    Command::none()
593                }
594            }
595        }
596
597        fn view(
598            &self,
599            _window: &mut Window,
600            _cx: &mut App,
601            _scope: &ModelContext<Self::Msg>,
602            _dispatcher: &Dispatcher<Self::Msg>,
603        ) -> View {
604            div().into_view()
605        }
606    }
607
608    #[gpui::test]
609    fn stale_keyed_completion_is_ignored(cx: &mut TestAppContext) {
610        let events = Arc::new(Mutex::new(Vec::new()));
611        let config = ProgramConfig::default()
612            .describe_message(|msg: &Msg| match msg {
613                Msg::Set(value) => format!("set:{value}"),
614            })
615            .describe_key(|key| format!("{key:?}"))
616            .observer({
617                let events = events.clone();
618                move |event: RuntimeEvent<'_, Msg>| {
619                    if let RuntimeEvent::StaleKeyedCompletionIgnored {
620                        message_description,
621                        ..
622                    } = event
623                    {
624                        events
625                            .lock()
626                            .unwrap()
627                            .push(message_description.map(|value| value.to_string()));
628                    }
629                }
630            });
631
632        let program: Entity<Program<TestModel>> =
633            cx.update(|cx| TestModel { state: 0 }.into_program_with(config, cx));
634        let key = Key::new("stale");
635
636        program.update(cx, |program: &mut Program<TestModel>, cx| {
637            let new_generation = program.runtime.tasks.next_generation();
638            program.runtime.tasks.insert(
639                key.clone(),
640                new_generation,
641                CommandMeta::new(CommandKind::Foreground, Some(Arc::from("current"))),
642                Task::ready(()),
643            );
644
645            let stale_meta = CommandMeta::new(CommandKind::Foreground, Some(Arc::from("stale")));
646            program.apply_completion(
647                Some(Msg::Set(7)),
648                &stale_meta,
649                Some(KeyedEffectContext {
650                    key: key.clone(),
651                    generation: new_generation.previous(),
652                }),
653                cx,
654            );
655        });
656
657        program.read_with(cx, |program: &Program<TestModel>, _cx| {
658            assert_eq!(program.model().state, 0);
659        });
660        assert_eq!(
661            events.lock().unwrap().as_slice(),
662            &[Some(String::from("set:7"))]
663        );
664    }
665
666    #[gpui::test]
667    fn runtime_snapshot_reports_current_counts_without_side_effects(cx: &mut TestAppContext) {
668        let program: Entity<Program<TestModel>> =
669            cx.update(|cx| TestModel { state: 0 }.into_program(cx));
670
671        let before = program.read_with(cx, |program, _cx| program.runtime_snapshot());
672        let after = program.read_with(cx, |program, _cx| program.runtime_snapshot());
673
674        assert_eq!(before, after);
675        assert_eq!(before.queue_depth, 0);
676        assert!(!before.is_draining);
677        assert_eq!(before.active_keyed_tasks, 0);
678        assert_eq!(before.active_subscriptions, 0);
679    }
680
681    #[gpui::test]
682    fn internal_enqueue_respects_reject_new_policy_without_extra_notify(cx: &mut TestAppContext) {
683        struct RejectModel {
684            renders: Arc<Mutex<usize>>,
685        }
686
687        impl Model for RejectModel {
688            type Msg = Msg;
689
690            fn update(
691                &mut self,
692                msg: Self::Msg,
693                _cx: &mut App,
694                _scope: &ModelContext<Self::Msg>,
695            ) -> Command<Self::Msg> {
696                match msg {
697                    Msg::Set(0) => {
698                        Command::batch([Command::emit(Msg::Set(1)), Command::emit(Msg::Set(2))])
699                    }
700                    Msg::Set(_) => Command::none(),
701                }
702            }
703
704            fn view(
705                &self,
706                _window: &mut Window,
707                _cx: &mut App,
708                _scope: &ModelContext<Self::Msg>,
709                _dispatcher: &Dispatcher<Self::Msg>,
710            ) -> View {
711                *self.renders.lock().unwrap() += 1;
712                div().into_view()
713            }
714        }
715
716        let renders = Arc::new(Mutex::new(0));
717        let program: Entity<Program<RejectModel>> = cx.update(|cx| {
718            RejectModel {
719                renders: renders.clone(),
720            }
721            .into_program_with(
722                ProgramConfig::default().queue_policy(QueuePolicy::RejectNew { capacity: 1 }),
723                cx,
724            )
725        });
726        let dispatcher = program.read_with(cx, |program, _cx| program.dispatcher());
727
728        dispatcher.dispatch(Msg::Set(0)).unwrap();
729        cx.run_until_parked();
730
731        let snapshot = program.read_with(cx, |program, _cx| program.runtime_snapshot());
732        assert_eq!(snapshot.queue_depth, 0);
733    }
734
735    #[gpui::test]
736    fn cancel_key_clears_tracked_task_and_emits_telemetry(cx: &mut TestAppContext) {
737        #[derive(Clone, Debug, PartialEq, Eq)]
738        enum CancelMsg {
739            RunNext,
740            Loaded(i32),
741        }
742
743        #[derive(Clone, Debug, PartialEq, Eq)]
744        enum CancelEvent {
745            Canceled(Option<String>),
746            Stale(Option<String>),
747        }
748
749        struct CancelModel {
750            commands: VecDeque<Command<CancelMsg>>,
751            values: Vec<i32>,
752        }
753
754        impl Model for CancelModel {
755            type Msg = CancelMsg;
756
757            fn update(
758                &mut self,
759                msg: Self::Msg,
760                _cx: &mut App,
761                _scope: &ModelContext<Self::Msg>,
762            ) -> Command<Self::Msg> {
763                match msg {
764                    CancelMsg::RunNext => self.commands.pop_front().unwrap_or_else(Command::none),
765                    CancelMsg::Loaded(value) => {
766                        self.values.push(value);
767                        Command::none()
768                    }
769                }
770            }
771
772            fn view(
773                &self,
774                _window: &mut Window,
775                _cx: &mut App,
776                _scope: &ModelContext<Self::Msg>,
777                _dispatcher: &Dispatcher<Self::Msg>,
778            ) -> View {
779                div().into_view()
780            }
781        }
782
783        let events = Arc::new(Mutex::new(Vec::new()));
784        let (sender, receiver) = oneshot::channel();
785        let config = ProgramConfig::default()
786            .describe_message(|msg: &CancelMsg| match msg {
787                CancelMsg::RunNext => String::from("run-next"),
788                CancelMsg::Loaded(value) => format!("loaded:{value}"),
789            })
790            .describe_key(|key| format!("{key:?}"))
791            .telemetry_observer({
792                let events = events.clone();
793                move |envelope| match envelope.event {
794                    TelemetryEvent::KeyedCommandCanceled {
795                        key_description, ..
796                    } => events.lock().unwrap().push(CancelEvent::Canceled(
797                        key_description.map(|value| value.to_string()),
798                    )),
799                    TelemetryEvent::StaleKeyedCompletionIgnored {
800                        message_description,
801                        ..
802                    } => events.lock().unwrap().push(CancelEvent::Stale(
803                        message_description.map(|value| value.to_string()),
804                    )),
805                    _ => {}
806                }
807            });
808
809        let program: Entity<Program<CancelModel>> = cx.update(|cx| {
810            CancelModel {
811                commands: VecDeque::from([
812                    Command::background_keyed("load", move |_| async move {
813                        receiver.await.ok().map(CancelMsg::Loaded)
814                    })
815                    .label("load"),
816                    Command::cancel_key("load"),
817                ]),
818                values: Vec::new(),
819            }
820            .into_program_with(config, cx)
821        });
822        let dispatcher = program.read_with(cx, |program, _cx| program.dispatcher());
823
824        dispatcher.dispatch(CancelMsg::RunNext).unwrap();
825        cx.run_until_parked();
826        dispatcher.dispatch(CancelMsg::RunNext).unwrap();
827        sender.send(8).unwrap();
828        cx.run_until_parked();
829
830        program.read_with(cx, |program, _cx| {
831            assert!(program.model().values.is_empty());
832            assert_eq!(program.runtime_snapshot().active_keyed_tasks, 0);
833        });
834        assert_eq!(
835            events.lock().unwrap().as_slice(),
836            &[
837                CancelEvent::Canceled(Some(String::from("Key(\"load\")"))),
838                CancelEvent::Stale(Some(String::from("loaded:8"))),
839            ]
840        );
841    }
842}